Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from .logger import setup_usdf_logger
from .make_pgpass import make_pgpass
from .middleware_interface import get_central_butler, MiddlewareInterface
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
from .raw import Snap
from .visit import Visit

Expand Down Expand Up @@ -84,12 +84,9 @@

storage_client = boto3.client('s3', endpoint_url=s3_endpoint)

# Initialize middleware interface.
mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name),
image_bucket,
instrument_name,
skymap,
local_repos)
central_butler = get_central_butler(calib_repo, instrument_name)
# local_repo is a temporary directory with the same lifetime as this process.
local_repo = make_local_repo(local_repos, central_butler, instrument_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can image why we can't use a context manager for cleanup here; do we need to worry about that, or is the lifetime of this process tied to some kind of more complete reset of the compute resource that will take care of that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For performance reasons, we want to reuse the same local repo for processing multiple exposures. And AFAIK Flask doesn't let us pass in arguments to next_visit_handler, so it needs to be (directly or indirectly) a global. I think that precludes use of a context manager.

Basically, the way the Prompt Processing architecture is set up, the Python process represented by activator.py has the same lifetime as the (virtual) filesystem it's running on. I don't know if that answers your second question.



def check_for_snap(
Expand Down Expand Up @@ -232,6 +229,13 @@ def next_visit_handler() -> Tuple[str, int]:
f"Expected {instrument_name}, received {expected_visit.instrument}."
expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(central_butler,
image_bucket,
instrument_name,
skymap,
local_repo.name)
# Copy calibrations for this detector/visit
mwi.prep_butler(expected_visit)

Expand Down
137 changes: 84 additions & 53 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

__all__ = ["get_central_butler", "MiddlewareInterface"]
__all__ = ["get_central_butler", "make_local_repo", "MiddlewareInterface"]

import collections.abc
import datetime
Expand All @@ -34,7 +34,7 @@
from lsst.utils import getPackageDir
from lsst.resources import ResourcePath
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SimplePipelineExecutor
from lsst.ctrl.mpexec import SeparablePipelineExecutor
from lsst.daf.butler import Butler, CollectionType
import lsst.geom
from lsst.meas.algorithms.htmIndexer import HtmIndexer
Expand Down Expand Up @@ -76,6 +76,53 @@ def get_central_butler(central_repo: str, instrument_class: str):
)


def make_local_repo(local_storage: str, central_butler: Butler, instrument: str):
"""Create and configure a new local repository.

The repository is represented by a temporary directory object, which can be
used to manage its lifetime.

Parameters
----------
local_storage : `str`
An absolute path to a space where this function can create a local
Butler repo.
central_butler : `lsst.daf.butler.Butler`
Butler repo containing instrument and skymap definitions.
instrument : `str`
Name of the instrument taking the data, for populating
butler collections and dataIds. May be either the fully qualified class
name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam".

Returns
-------
repo_dir
An object of the same type as returned by `tempfile.TemporaryDirectory`,
pointing to the local repo location.
"""
repo_dir = tempfile.TemporaryDirectory(dir=local_storage, prefix="butler-")
butler = Butler(Butler.makeRepo(repo_dir.name), writeable=True)
_log.info("Created local Butler repo at %s.", repo_dir.name)

# Run-once repository initialization

instrument = lsst.obs.base.Instrument.from_string(instrument, central_butler.registry)
instrument.register(butler.registry)

butler.registry.registerCollection(instrument.makeUmbrellaCollectionName(),
CollectionType.CHAINED)
butler.registry.registerCollection(instrument.makeDefaultRawIngestRunName(),
CollectionType.RUN)
output_collection = instrument.makeCollectionName("prompt")
butler.registry.registerCollection(output_collection, CollectionType.CHAINED)
collections = [instrument.makeUmbrellaCollectionName(),
instrument.makeDefaultRawIngestRunName(),
]
butler.registry.setCollectionChain(output_collection, collections)

return repo_dir


class MiddlewareInterface:
"""Interface layer between the Butler middleware and the prompt processing
data handling system, to handle processing individual images.
Expand All @@ -86,15 +133,13 @@ class MiddlewareInterface:
ingest the data when it is available, and run the difference imaging
pipeline, all in that local butler.

Each instance may be used for processing more than one group-detector
combination, designated by the `Visit` parameter to certain methods. There
is no guarantee that a processing run may, or may not, share a group,
detector, or both with a previous run handled by the same object.
Each instance must be used for processing only one group-detector
combination. The object may contain state that is unique to a particular
processing run.

``MiddlewareInterface`` objects are not thread- or process-safe, and must
not share any state with other instances (see ``butler`` in the parameter
list). They may, however, share a ``central_butler``, and concurrent
operations on this butler are guaranteed to be appropriately synchronized.
``MiddlewareInterface`` objects are not thread- or process-safe. It is up
to the client to avoid conflicts from multiple objects trying to access the
same local repo.

Parameters
----------
Expand All @@ -113,9 +158,9 @@ class MiddlewareInterface:
use of the butler.
skymap: `str`
Name of the skymap in the central repo for querying templates.
local_storage : `str`
An absolute path to a space where this object can create a local
Butler repo. The repo is guaranteed to be unique to this object.
local_repo : `str`
A URI to the local Butler repo, which is assumed to already exist and
contain standard collections and the registration of ``instrument``.
prefix : `str`, optional
URI scheme followed by ``://``; prepended to ``image_bucket`` when
constructing URIs to retrieve incoming files. The default is
Expand All @@ -132,15 +177,15 @@ class MiddlewareInterface:
# self._download_store is None if and only if self.image_host is a local URI.
# self.instrument, self.camera, self.skymap, self._deployment do not change
# after __init__.
# self._repo is the only reference to its TemporaryDirectory object.
# self.butler defaults to a chained collection named
# self.output_collection, which contains zero or more output runs,
# pre-made inputs, and raws, in that order. However, self.butler is not
# self.output_collection, which contains zero or more output runs
# and all pipeline inputs, in that order. However, self.butler is not
# guaranteed to contain concrete data, or even the dimensions
# corresponding to self.camera and self.skymap.
# corresponding to self.camera and self.skymap. Do not assume that
# self.butler is the only Butler pointing to the local repo.

def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
skymap: str, local_storage: str,
skymap: str, local_repo: str,
prefix: str = "s3://"):
# Deployment/version ID -- potentially expensive to generate.
self._deployment = self._get_deployment()
Expand All @@ -158,7 +203,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,

self.output_collection = self.instrument.makeCollectionName("prompt")

self._init_local_butler(local_storage)
self._init_local_butler(local_repo)
self._init_ingester()
self._init_visit_definer()

Expand Down Expand Up @@ -206,44 +251,24 @@ def _make_apdb_uri(self):
user_apdb = os.environ.get("USER_APDB", "postgres")
return f"postgresql://{user_apdb}@{ip_apdb}/{db_apdb}"

def _init_local_butler(self, base_path: str):
def _init_local_butler(self, repo_uri: str):
"""Prepare the local butler to ingest into and process from.

``self.instrument`` must already exist. ``self.butler`` is correctly
initialized after this method returns, and is guaranteed to be unique
to this object.
initialized after this method returns.

Parameters
----------
base_path : `str`
An absolute path to a space where the repo can be created.
repo_uri : `str`
A URI to the location of the local repository.
"""
# Directory has same lifetime as this object.
self._repo = tempfile.TemporaryDirectory(dir=base_path, prefix="butler-")
butler = Butler(Butler.makeRepo(self._repo.name), writeable=True)
_log.info("Created local Butler repo at %s.", self._repo.name)

self.instrument.register(butler.registry)

# Will be populated in prep_butler.
butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(),
CollectionType.CHAINED)
# Will be populated on ingest.
butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(),
CollectionType.RUN)
# Will be populated on pipeline execution.
butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED)
collections = [self.instrument.makeUmbrellaCollectionName(),
self.instrument.makeDefaultRawIngestRunName(),
]
butler.registry.setCollectionChain(self.output_collection, collections)

# Internal Butler keeps a reference to the newly prepared collection.
# This reference makes visible any inputs for query purposes. Output
# runs are execution-specific and must be provided explicitly to the
# appropriate calls.
self.butler = Butler(butler=butler,
self.butler = Butler(repo_uri,
collections=[self.output_collection],
writeable=True,
)

def _init_ingester(self):
Expand Down Expand Up @@ -358,9 +383,10 @@ def prep_butler(self, visit: Visit) -> None:
wcs = self._predict_wcs(detector, visit)
center, radius = self._detector_bounding_circle(detector, wcs)

# central repo may have been modified by other MWI instances.
# repos may have been modified by other MWI instances.
# TODO: get a proper synchronization API for Butler
self.central_butler.registry.refresh()
self.butler.registry.refresh()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what this refresh and the one below are doing for you, especially after the switch to per-visit MiddlewareInterface objects. Do workers ever need to use something that is only added to the repo by some other worker? Or is this to avoid races when creating some shared repo content?

Copy link
Member Author

@kfindeisen kfindeisen Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refresh might not be strictly necessary, because currently there's still only one MiddlewareInterface instance per process/worker at a time. But MiddlewareInterface itself no longer guarantees that there are no concurrent writes to the local repo, and we might add a more complex execution framework in the activator later without realizing that MWI was making assumptions. So I feel better having a refresh before each operation, even if it slows things down a bit.

To answer your more specific questions, we expect to only import calibs/templates/refcats/etc. once, the first time they're needed (this is the main advantage of reusing the local repo). So while each local repo is unique to its worker, a MiddlewareInterface object may indeed need to use something that was added by a different object.

Copy link
Member Author

@kfindeisen kfindeisen Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry, I misread the original question. Yes, the central_butler refreshes are to avoid conflicts between concurrently writing workers. So far the only such conflicts have involved things like collection creation, and there shouldn't be any right now, but as with the local repo I'm trying to be robust to unknown future changes. There's still a lot of churn in architecture, execution strategy, repo organization, etc.


with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file:
with self.central_butler.export(filename=export_file.name, format="yaml") as export:
Expand Down Expand Up @@ -607,6 +633,7 @@ def _prep_collections(self, visit: Visit):
is prefixed by ``self.output_collection``.
"""
output_run = self._get_output_run(visit)
self.butler.registry.refresh()
self.butler.registry.registerCollection(output_run, CollectionType.RUN)
# As of Feb 2023, this moves output_run to the front of the chain if
# it's already present, but this behavior cannot be relied upon.
Expand Down Expand Up @@ -750,17 +777,17 @@ def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None:
output_run_butler = Butler(butler=self.butler,
collections=(self._get_init_output_run(visit), ) + self.butler.collections,
run=output_run)
executor = SimplePipelineExecutor.from_pipeline(pipeline,
where=where,
butler=output_run_butler)
if len(executor.quantum_graph) == 0:
executor = SeparablePipelineExecutor(output_run_butler, clobber_output=False, skip_existing_in=None)
qgraph = executor.make_quantum_graph(pipeline, where=where)
if len(qgraph) == 0:
# TODO: a good place for a custom exception?
raise RuntimeError("No data to process.")
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
# If this is a fresh (local) repo, then types like calexp,
# *Diff_diaSrcTable, etc. have not been registered.
result = executor.run(register_dataset_types=True)
_log.info(f"Pipeline successfully run on {len(result)} quanta for "
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
executor.run_pipeline(qgraph)
_log.info(f"Pipeline successfully run on "
f"detector {visit.detector} of {exposure_ids}.")

def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None:
Expand Down Expand Up @@ -825,6 +852,9 @@ def _export_subset(self, visit: Visit, exposure_ids: set[int],
The collections to transfer from; can be any expression described
in :ref:`daf_butler_collection_expressions`.
"""
# local repo may have been modified by other MWI instances.
self.butler.registry.refresh()

try:
# Need to iterate over datasets at least twice, so list.
datasets = list(self.butler.registry.queryDatasets(
Expand Down Expand Up @@ -873,6 +903,7 @@ def clean_local_repo(self, visit: Visit, exposure_ids: set[int]) -> None:
exposure_ids : `set` [`int`]
Identifiers of the exposures to be removed.
"""
self.butler.registry.refresh()
raws = self.butler.registry.queryDatasets(
'raw',
collections=self.instrument.makeDefaultRawIngestRunName(),
Expand Down
40 changes: 28 additions & 12 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import lsst.resources

from activator.visit import Visit
from activator.middleware_interface import get_central_butler, MiddlewareInterface, \
from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \
_filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError

# The short name of the instrument used in the test repo.
Expand Down Expand Up @@ -133,9 +133,9 @@ def setUp(self):
instrument = "lsst.obs.decam.DarkEnergyCamera"
self.input_data = os.path.join(data_dir, "input_data")
# Have to preserve the tempdir, so that it doesn't get cleaned up.
self.workspace = tempfile.TemporaryDirectory()
self.local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname)
self.interface = MiddlewareInterface(central_butler, self.input_data, instrument,
skymap_name, self.workspace.name,
skymap_name, self.local_repo.name,
prefix="file://")

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
Expand Down Expand Up @@ -164,8 +164,7 @@ def setUp(self):
def tearDown(self):
super().tearDown()
# TemporaryDirectory warns on leaks
self.interface._repo.cleanup() # TODO: should MiddlewareInterface have a cleanup method?
self.workspace.cleanup()
self.local_repo.cleanup()

def test_get_butler(self):
for butler in [get_central_butler(self.central_repo, "lsst.obs.decam.DarkEnergyCamera"),
Expand All @@ -178,6 +177,17 @@ def test_get_butler(self):
self.assertEqual(list(butler.collections), [f"{instname}/defaults"])
self.assertTrue(butler.isWriteable())

def test_make_local_repo(self):
for inst in [instname, "lsst.obs.decam.DarkEnergyCamera"]:
with make_local_repo(tempfile.gettempdir(), Butler(self.central_repo), inst) as repo_dir:
self.assertTrue(os.path.exists(repo_dir))
butler = Butler(repo_dir)
self.assertEqual([x.dataId for x in butler.registry.queryDimensionRecords("instrument")],
[DataCoordinate.standardize({"instrument": instname},
universe=butler.dimensions)])
self.assertIn(f"{instname}/defaults", butler.registry.queryCollections())
self.assertFalse(os.path.exists(repo_dir))

def test_init(self):
"""Basic tests of the initialized interface object.
"""
Expand Down Expand Up @@ -348,10 +358,17 @@ def test_run_pipeline(self):
mock.return_value = file_data
self.interface.ingest_image(self.next_visit, filename)

with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run:
with unittest.mock.patch(
"activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph") \
as mock_preexec, \
unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
as mock_run:
with self.assertLogs(self.logger_name, level="INFO") as logs:
self.interface.run_pipeline(self.next_visit, {1})
mock_run.assert_called_once_with(register_dataset_types=True)
mock_preexec.assert_called_once()
# Pre-execution may have other arguments as needed; no requirement either way.
self.assertEqual(mock_preexec.call_args.kwargs["register_dataset_types"], True)
mock_run.assert_called_once()
# Check that we configured the right pipeline.
self.assertIn("End to end Alert Production pipeline specialized for HiTS-2015",
"\n".join(logs.output))
Expand Down Expand Up @@ -600,10 +617,11 @@ def setUp(self):
instrument = "lsst.obs.decam.DarkEnergyCamera"
data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
self.input_data = os.path.join(data_dir, "input_data")
workspace = tempfile.TemporaryDirectory()

local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname)
# TemporaryDirectory warns on leaks; addCleanup also keeps the TD from
# getting garbage-collected.
self.addCleanup(tempfile.TemporaryDirectory.cleanup, workspace)
self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo)

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
ra = 155.4702849608958
Expand Down Expand Up @@ -631,10 +649,8 @@ def setUp(self):

# Populate repository.
self.interface = MiddlewareInterface(central_butler, self.input_data, instrument,
skymap_name, workspace.name,
skymap_name, local_repo.name,
prefix="file://")
# TODO: should MiddlewareInterface have a cleanup method?
self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo)
self.interface.prep_butler(self.next_visit)
filename = "fakeRawImage.fits"
filepath = os.path.join(self.input_data, filename)
Expand Down