Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-36412: Try to set the datastore cache directory explicitly #206

Merged
merged 3 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/changes/DM-36412.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
If ``pipetask`` is run with multiple processes and if a butler datastore cache is configured, all subprocesses will now share the same cache.
For large numbers of simultaneous processes it may be necessary to significantly increase the number of datasets in the cache to make the cache usable.
This can be done by using the ``$DAF_BUTLER_CACHE_EXPIRATION_MODE`` environment variable.

Previously each subprocess would get its own cache and if ``fork`` start method was used these cache directories would not be cleaned up.
24 changes: 20 additions & 4 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@

__all__ = ["CmdLineFwk"]

# -------------------------------
# Imports of standard modules --
# -------------------------------
import atexit
import copy
import datetime
import getpass
import logging
import shutil
from collections.abc import Iterable, Sequence
from types import SimpleNamespace
from typing import Optional, Tuple

from lsst.daf.butler import Butler, CollectionType, DatasetRef, Registry
from lsst.daf.butler import Butler, CollectionType, DatasetRef, DatastoreCacheManager, Registry
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.pipe.base import (
Expand Down Expand Up @@ -317,6 +316,7 @@ def makeReadButler(cls, args: SimpleNamespace) -> Butler:
A read-only butler initialized with the collections specified by
``args``.
"""
cls.defineDatastoreCache() # Ensure that this butler can use a shared cache.
butler, inputs, _ = cls._makeReadParts(args)
_LOG.debug("Preparing butler to read from %s.", inputs)
return Butler(butler=butler, collections=inputs)
Expand Down Expand Up @@ -351,6 +351,21 @@ def makeButlerAndCollections(cls, args: SimpleNamespace) -> Tuple[Butler, Sequen
_LOG.debug("Preparing registry to read from %s and expect future writes to '%s'.", inputs, run)
return butler, inputs, run

@staticmethod
def defineDatastoreCache() -> None:
"""Define where datastore cache directories should be found.

Notes
-----
All the jobs should share a datastore cache if applicable. This
method asks for a shared fallback cache to be defined and then
configures an exit handler to clean it up.
"""
defined, cache_dir = DatastoreCacheManager.set_fallback_cache_directory_if_unset()
if defined:
atexit.register(shutil.rmtree, cache_dir, ignore_errors=True)
_LOG.debug("Defining shared datastore cache directory to %s", cache_dir)

@classmethod
def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[TaskDef]] = None) -> Butler:
"""Return a read-write butler initialized to write to and read from
Expand All @@ -371,6 +386,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Optional[Iterable[Task
butler : `lsst.daf.butler.Butler`
A read-write butler initialized according to the given arguments.
"""
cls.defineDatastoreCache() # Ensure that this butler can use a shared cache.
butler = Butler(args.butler_config, writeable=True)
self = cls(butler.registry, args, writeable=True)
self.check(args)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,10 +505,10 @@ def testEmptyQGraph(self):
fwk = CmdLineFwk()
with self.assertLogs(level=logging.CRITICAL) as cm:
qgraph = fwk.makeGraph(self.pipeline, args)
self.assertRegexpMatches(
self.assertRegex(
cm.output[0], ".*Initial data ID query returned no rows, so QuantumGraph will be empty.*"
)
self.assertRegexpMatches(cm.output[1], ".*No datasets.*bad_input.*")
self.assertRegex(cm.output[1], ".*No datasets.*bad_input.*")
self.assertIsNone(qgraph)

def testSimpleQGraphNoSkipExisting_inputs(self):
Expand Down