Skip to content

Commit

Permalink
Merge pull request #935 from lsst/tickets/DM-42317
Browse files Browse the repository at this point in the history
DM-42317: Make cloned Butler instances threadsafe
  • Loading branch information
dhirving committed Jan 18, 2024
2 parents 5f0743b + 1fab958 commit acb4c37
Show file tree
Hide file tree
Showing 42 changed files with 822 additions and 228 deletions.
5 changes: 5 additions & 0 deletions doc/changes/DM-42317.api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The ``Datastore`` base class was changed so that subclasses are no longer
required to have the same constructor parameters as the base class. Subclasses
are now required to implement ``_create_from_config`` for creating an instance
from the ``Datastore.fromConfig`` static method, and ``clone`` for creating a
copy of an existing instance.
1 change: 1 addition & 0 deletions doc/changes/DM-42317.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed miscellaneous thread-safety issues in DimensionUniverse, DimensionGroup, and StorageClassFactory.
1 change: 1 addition & 0 deletions doc/changes/DM-42317.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"Cloned" Butler instances returned from ``Butler(butler=otherButler)`` and ``LabeledButlerFactory`` no longer share internal state with their parent instance. This makes it safe to use the new instance concurrently with the original in separate threads. It is still unsafe to use a single ``Butler`` instance concurrently from multiple threads.
25 changes: 14 additions & 11 deletions python/lsst/daf/butler/_labeled_butler_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ class LabeledButlerFactory:
For each label in the repository index, caches shared state to allow fast
instantiation of new instances.
Instance methods on this class are threadsafe. A single instance of
`LabeledButlerFactory` can be shared between multiple threads. Note that
``DirectButler`` itself is not currently threadsafe, so this guarantee does
not buy you much. See DM-42317.
Instance methods on this class are threadsafe -- a single instance of
`LabeledButlerFactory` can be used concurrently by multiple threads. It is
NOT safe for a single `Butler` instance returned by this factory to be used
concurrently by multiple threads. However, separate `Butler` instances can
safely be used by separate threads.
"""

def __init__(self, repositories: Mapping[str, str] | None = None) -> None:
Expand Down Expand Up @@ -146,18 +147,20 @@ def _get_config_uri(self, label: str) -> ResourcePathExpression:


def _create_direct_butler_factory(config: ButlerConfig) -> _FactoryFunction:
import lsst.daf.butler.direct_butler

# Create a 'template' Butler that will be cloned when callers request an
# instance.
butler = Butler.from_config(config)
assert isinstance(butler, lsst.daf.butler.direct_butler.DirectButler)

# Load caches so that data is available in cloned instances without
# needing to refetch it from the database for every instance.
butler._preload_cache()

def create_butler(access_token: str | None) -> Butler:
# Access token is ignored because DirectButler does not use Gafaelfawr
# authentication.

# TODO DM-42317: This is not actually safe in its current form, because
# clone returns an object that has non-thread-safe mutable state shared
# between the original and cloned instance.
# However, current services are already sharing a single global
# non-cloned Butler instance, so this isn't making things worse than
# they already are.
return butler._clone()

return create_butler
Expand Down
123 changes: 57 additions & 66 deletions python/lsst/daf/butler/_storage_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,8 @@
import itertools
import logging
from collections import ChainMap
from collections.abc import (
Callable,
Collection,
ItemsView,
Iterator,
KeysView,
Mapping,
Sequence,
Set,
ValuesView,
)
from collections.abc import Callable, Collection, Mapping, Sequence, Set
from threading import RLock
from typing import Any

from lsst.utils import doImportType
Expand Down Expand Up @@ -641,6 +632,7 @@ class StorageClassFactory(metaclass=Singleton):
def __init__(self, config: StorageClassConfig | str | None = None):
self._storageClasses: dict[str, StorageClass] = {}
self._configs: list[StorageClassConfig] = []
self._lock = RLock()

# Always seed with the default config
self.addFromConfig(StorageClassConfig())
Expand All @@ -656,8 +648,9 @@ def __str__(self) -> str:
summary : `str`
Summary of the factory status.
"""
sep = "\n"
return f"""Number of registered StorageClasses: {len(self._storageClasses)}
with self._lock:
sep = "\n"
return f"""Number of registered StorageClasses: {len(self._storageClasses)}
StorageClasses
--------------
Expand Down Expand Up @@ -687,26 +680,15 @@ def __contains__(self, storageClassOrName: StorageClass | str) -> bool:
StorageClass.name to be in the factory but StorageClass to not be
in the factory.
"""
if isinstance(storageClassOrName, str):
return storageClassOrName in self._storageClasses
elif isinstance(storageClassOrName, StorageClass) and storageClassOrName.name in self._storageClasses:
return storageClassOrName == self._storageClasses[storageClassOrName.name]
return False

def __len__(self) -> int:
return len(self._storageClasses)

def __iter__(self) -> Iterator[str]:
return iter(self._storageClasses)

def values(self) -> ValuesView[StorageClass]:
return self._storageClasses.values()

def keys(self) -> KeysView[str]:
return self._storageClasses.keys()

def items(self) -> ItemsView[str, StorageClass]:
return self._storageClasses.items()
with self._lock:
if isinstance(storageClassOrName, str):
return storageClassOrName in self._storageClasses
elif (
isinstance(storageClassOrName, StorageClass)
and storageClassOrName.name in self._storageClasses
):
return storageClassOrName == self._storageClasses[storageClassOrName.name]
return False

def addFromConfig(self, config: StorageClassConfig | Config | str) -> None:
"""Add more `StorageClass` definitions from a config file.
Expand All @@ -718,7 +700,6 @@ def addFromConfig(self, config: StorageClassConfig | Config | str) -> None:
key if part of a global configuration.
"""
sconfig = StorageClassConfig(config)
self._configs.append(sconfig)

# Since we can not assume that we will get definitions of
# components or parents before their classes are defined
Expand Down Expand Up @@ -793,8 +774,10 @@ def processStorageClass(name: str, _sconfig: StorageClassConfig, msg: str = "")
context = f"when adding definitions from {', '.join(files)}" if files else ""
log.debug("Adding definitions from config %s", ", ".join(files))

for name in list(sconfig.keys()):
processStorageClass(name, sconfig, context)
with self._lock:
self._configs.append(sconfig)
for name in list(sconfig.keys()):
processStorageClass(name, sconfig, context)

@staticmethod
def makeNewStorageClass(
Expand Down Expand Up @@ -872,7 +855,8 @@ def getStorageClass(self, storageClassName: str) -> StorageClass:
KeyError
The requested storage class name is not registered.
"""
return self._storageClasses[storageClassName]
with self._lock:
return self._storageClasses[storageClassName]

def findStorageClass(self, pytype: type, compare_types: bool = False) -> StorageClass:
"""Find the storage class associated with this python type.
Expand Down Expand Up @@ -903,18 +887,21 @@ def findStorageClass(self, pytype: type, compare_types: bool = False) -> Storage
storage classes. This method will currently return the first that
matches.
"""
result = self._find_storage_class(pytype, False)
if result:
return result

if compare_types:
# The fast comparison failed and we were asked to try the
# variant that might involve code imports.
result = self._find_storage_class(pytype, True)
with self._lock:
result = self._find_storage_class(pytype, False)
if result:
return result

raise KeyError(f"Unable to find a StorageClass associated with type {get_full_type_name(pytype)!r}")
if compare_types:
# The fast comparison failed and we were asked to try the
# variant that might involve code imports.
result = self._find_storage_class(pytype, True)
if result:
return result

raise KeyError(
f"Unable to find a StorageClass associated with type {get_full_type_name(pytype)!r}"
)

def _find_storage_class(self, pytype: type, compare_types: bool) -> StorageClass | None:
"""Iterate through all storage classes to find a match.
Expand All @@ -936,10 +923,11 @@ def _find_storage_class(self, pytype: type, compare_types: bool) -> StorageClass
-----
Helper method for ``findStorageClass``.
"""
for storageClass in self.values():
if storageClass.is_type(pytype, compare_types=compare_types):
return storageClass
return None
with self._lock:
for storageClass in self._storageClasses.values():
if storageClass.is_type(pytype, compare_types=compare_types):
return storageClass
return None

def registerStorageClass(self, storageClass: StorageClass, msg: str | None = None) -> None:
"""Store the `StorageClass` in the factory.
Expand All @@ -960,19 +948,20 @@ def registerStorageClass(self, storageClass: StorageClass, msg: str | None = Non
If a storage class has already been registered with
that storage class name and the previous definition differs.
"""
if storageClass.name in self._storageClasses:
existing = self.getStorageClass(storageClass.name)
if existing != storageClass:
errmsg = f" {msg}" if msg else ""
raise ValueError(
f"New definition for StorageClass {storageClass.name} ({storageClass!r}) "
f"differs from current definition ({existing!r}){errmsg}"
)
if type(existing) is StorageClass and type(storageClass) is not StorageClass:
# Replace generic with specialist subclass equivalent.
with self._lock:
if storageClass.name in self._storageClasses:
existing = self.getStorageClass(storageClass.name)
if existing != storageClass:
errmsg = f" {msg}" if msg else ""
raise ValueError(
f"New definition for StorageClass {storageClass.name} ({storageClass!r}) "
f"differs from current definition ({existing!r}){errmsg}"
)
if type(existing) is StorageClass and type(storageClass) is not StorageClass:
# Replace generic with specialist subclass equivalent.
self._storageClasses[storageClass.name] = storageClass
else:
self._storageClasses[storageClass.name] = storageClass
else:
self._storageClasses[storageClass.name] = storageClass

def _unregisterStorageClass(self, storageClassName: str) -> None:
"""Remove the named StorageClass from the factory.
Expand All @@ -992,14 +981,16 @@ def _unregisterStorageClass(self, storageClassName: str) -> None:
This method is intended to simplify testing of StorageClassFactory
functionality and it is not expected to be required for normal usage.
"""
del self._storageClasses[storageClassName]
with self._lock:
del self._storageClasses[storageClassName]

def reset(self) -> None:
"""Remove all storage class entries from factory and reset to
initial state.
This is useful for test code where a known start state is useful.
"""
self._storageClasses.clear()
# Seed with the default config.
self.addFromConfig(StorageClassConfig())
with self._lock:
self._storageClasses.clear()
# Seed with the default config.
self.addFromConfig(StorageClassConfig())
59 changes: 51 additions & 8 deletions python/lsst/daf/butler/datastore/_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ class Datastore(metaclass=ABCMeta):
referring to a configuration file.
bridgeManager : `DatastoreRegistryBridgeManager`
Object that manages the interface between `Registry` and datastores.
butlerRoot : `str`, optional
New datastore root to use to override the configuration value.
"""

defaultConfigFile: ClassVar[str | None] = None
Expand Down Expand Up @@ -381,18 +379,18 @@ def fromConfig(
butlerRoot : `str`, optional
Butler root directory.
"""
cls = doImportType(config["datastore", "cls"])
config = DatastoreConfig(config)
cls = doImportType(config["cls"])
if not issubclass(cls, Datastore):
raise TypeError(f"Imported child class {config['datastore', 'cls']} is not a Datastore")
return cls(config=config, bridgeManager=bridgeManager, butlerRoot=butlerRoot)
raise TypeError(f"Imported child class {config['cls']} is not a Datastore")
return cls._create_from_config(config=config, bridgeManager=bridgeManager, butlerRoot=butlerRoot)

def __init__(
self,
config: Config | ResourcePathExpression,
config: DatastoreConfig,
bridgeManager: DatastoreRegistryBridgeManager,
butlerRoot: ResourcePathExpression | None = None,
):
self.config = DatastoreConfig(config)
self.config = config
self.name = "ABCDataStore"
self._transaction: DatastoreTransaction | None = None

Expand All @@ -403,6 +401,39 @@ def __init__(
constraintsConfig = self.config.get("constraints")
self.constraints = Constraints(constraintsConfig, universe=bridgeManager.universe)

@classmethod
@abstractmethod
def _create_from_config(
cls,
config: DatastoreConfig,
bridgeManager: DatastoreRegistryBridgeManager,
butlerRoot: ResourcePathExpression | None,
) -> Datastore:
"""`Datastore`.``fromConfig`` calls this to instantiate Datastore
subclasses. This is the primary constructor for the individual
Datastore subclasses.
"""
raise NotImplementedError()

@abstractmethod
def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore:
"""Make an independent copy of this Datastore with a different
`DatastoreRegistryBridgeManager` instance.
Parameters
----------
bridgeManager : `DatastoreRegistryBridgeManager`
New `DatastoreRegistryBridgeManager` object to use when
instantiating managers.
Returns
-------
datastore : `Datastore`
New `Datastore` instance with the same configuration as the
existing instance.
"""
raise NotImplementedError()

def __str__(self) -> str:
return self.name

Expand Down Expand Up @@ -1322,6 +1353,18 @@ class NullDatastore(Datastore):
Ignored.
"""

@classmethod
def _create_from_config(
cls,
config: Config,
bridgeManager: DatastoreRegistryBridgeManager,
butlerRoot: ResourcePathExpression | None = None,
) -> NullDatastore:
return NullDatastore(config, bridgeManager, butlerRoot)

def clone(self, bridgeManager: DatastoreRegistryBridgeManager) -> Datastore:
return self

@classmethod
def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool = True) -> None:
# Nothing to do. This is not a real Datastore.
Expand Down

0 comments on commit acb4c37

Please sign in to comment.