diff --git a/python/activator/activator.py b/python/activator/activator.py
index c647ddf9..addd3b54 100644
--- a/python/activator/activator.py
+++ b/python/activator/activator.py
@@ -33,6 +33,7 @@
import confluent_kafka as kafka
from flask import Flask, request
+from .config import PipelinesConfig
from .logger import setup_usdf_logger
from .make_pgpass import make_pgpass
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
@@ -66,6 +67,8 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing")
+# The pipelines to execute and the conditions in which to choose them.
+pipelines = PipelinesConfig(os.environ["PIPELINES_CONFIG"])
setup_usdf_logger(
labels={"instrument": instrument_name},
@@ -245,6 +248,7 @@ def next_visit_handler() -> Tuple[str, int]:
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
+ pipelines,
skymap,
local_repo.name)
# Copy calibrations for this detector/visit
diff --git a/python/activator/config.py b/python/activator/config.py
new file mode 100644
index 00000000..dd38b8dd
--- /dev/null
+++ b/python/activator/config.py
@@ -0,0 +1,137 @@
+# This file is part of prompt_prototype.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+
+__all__ = ["PipelinesConfig"]
+
+
+import collections.abc
+import os
+import re
+
+from .visit import FannedOutVisit
+
+
+class PipelinesConfig:
+ """A pipeline configuration for the Prompt Processing service.
+
+ This class provides the execution framework with a simple interface for
+ identifying the pipeline to execute. It attempts to abstract the details of
+ which factors affect the choice of pipeline to make it easier to add new
+ features in the future.
+
+ Parameters
+ ----------
+ config : `str`
+ A string describing pipeline selection criteria. The current format is
+ a space-delimited list of mappings, each of which has the format
+ ``(survey="")=``. The pipeline path may contain
+ environment variables. No key or value may contain the "=" character.
+ See examples below.
+
+ Notes
+ -----
+ While it is not expected that there will ever be more than one
+ PipelinesConfig instance in a program's lifetime, this class is *not* a
+ singleton and objects must be passed explicitly to the code that
+ needs them.
+
+ Examples
+ --------
+ A single-survey config:
+
+ >>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml') # doctest: +ELLIPSIS
+
+
+ A config with multiple surveys, and environment variables:
+
+ >>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/ApPipe.yaml '
+ ... '(survey="Camera Test")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml '
+ ... '(survey="")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml ')
+ ... # doctest: +ELLIPSIS
+
+ """
+
+ def __init__(self, config: str):
+ if not config:
+ raise ValueError("Must configure at least one pipeline.")
+
+ self._mapping = self._parse_config(config)
+
+ @staticmethod
+ def _parse_config(config: str) -> collections.abc.Mapping:
+ """Turn a config string into structured config information.
+
+ Parameters
+ ----------
+ config : `str`
+ A string describing pipeline selection criteria. The current format
+ is a space-delimited list of mappings, each of which has the format
+ '(survey="")='. The pipeline path may contain
+ environment variables. No key or value may contain the "="
+ character.
+
+ Returns
+ -------
+ config : mapping [`str`, `str`]
+ A mapping from the survey type to the pipeline to run for that
+ survey. A more complex key or container type may be needed in the
+ future, if other pipeline selection criteria are added.
+
+ Raises
+ ------
+ ValueError
+ Raised if the string cannot be parsed.
+ """
+ # Use regex instead of str.split, in case keys or values also have spaces.
+ node = re.compile(r'\s*\(survey="(?P[\w\s]*)"\)='
+ r'(?P[-\w./${} ]*[-\w./${}])(?:\s+|$)')
+
+ items = {}
+ pos = 0
+ match = node.match(config, pos)
+ while match:
+ items[match['survey']] = match['filename']
+
+ pos = match.end()
+ match = node.match(config, pos)
+ if pos != len(config):
+ raise ValueError(f"Unexpected text at position {pos}: '{config[pos:]}'.")
+
+ return items
+
+ def get_pipeline_file(self, visit: FannedOutVisit) -> str:
+ """Identify the pipeline to be run, based on the provided visit.
+
+ Parameters
+ ----------
+ visit : `activator.visit.FannedOutVisit`
+ The visit for which a pipeline must be selected.
+
+ Returns
+ -------
+ pipeline : `str`
+ A path to a configured pipeline file.
+ """
+ try:
+ return os.path.expandvars(self._mapping[visit.survey])
+ except KeyError as e:
+ raise RuntimeError(f"Unsupported survey: {visit.survey}") from e
diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py
index 62fcffa9..17108f73 100644
--- a/python/activator/middleware_interface.py
+++ b/python/activator/middleware_interface.py
@@ -31,7 +31,6 @@
import tempfile
import typing
-from lsst.utils import getPackageDir
from lsst.resources import ResourcePath
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
@@ -41,6 +40,7 @@
import lsst.obs.base
import lsst.pipe.base
+from .config import PipelinesConfig
from .visit import FannedOutVisit
_log = logging.getLogger("lsst." + __name__)
@@ -156,6 +156,8 @@ class MiddlewareInterface:
See also ``prefix``.
visit : `activator.visit.FannedOutVisit`
The visit-detector combination to be processed by this object.
+ pipelines : `activator.config.PipelinesConfig`
+ Information about which pipelines to run on ``visit``.
skymap: `str`
Name of the skymap in the central repo for querying templates.
local_repo : `str`
@@ -194,7 +196,7 @@ class MiddlewareInterface:
# of self.output_collection.
def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVisit,
- skymap: str, local_repo: str,
+ pipelines: PipelinesConfig, skymap: str, local_repo: str,
prefix: str = "s3://"):
self.visit = visit
if self.visit.coordinateSystem != FannedOutVisit.CoordSys.ICRS:
@@ -217,6 +219,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi
self._download_store = None
# TODO: how much overhead do we pick up from going through the registry?
self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry)
+ self.pipelines = pipelines
self.output_collection = self.instrument.makeCollectionName("prompt")
self.init_output_run = self._get_init_output_run()
@@ -632,13 +635,7 @@ def _get_pipeline_file(self) -> str:
pipeline : `str`
A path to a configured pipeline file.
"""
- # TODO: We hacked the basepath in the Dockerfile so this works both in
- # development and in service container, but it would be better if there
- # were a path that's valid in both.
- return os.path.join(getPackageDir("prompt_prototype"),
- "pipelines",
- self.visit.instrument,
- "ApPipe.yaml")
+ return self.pipelines.get_pipeline_file(self.visit)
def _prep_pipeline(self) -> lsst.pipe.base.Pipeline:
"""Setup the pipeline to be run, based on the configured instrument and
@@ -658,8 +655,8 @@ def _prep_pipeline(self) -> lsst.pipe.base.Pipeline:
ap_pipeline_file = self._get_pipeline_file()
try:
pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file)
- except FileNotFoundError:
- raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}")
+ except FileNotFoundError as e:
+ raise RuntimeError from e
try:
pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri)
diff --git a/pipelines/DECam/ApPipe.yaml b/tests/data/ApPipe.yaml
similarity index 100%
rename from pipelines/DECam/ApPipe.yaml
rename to tests/data/ApPipe.yaml
diff --git a/tests/test_config.py b/tests/test_config.py
new file mode 100644
index 00000000..b5dae730
--- /dev/null
+++ b/tests/test_config.py
@@ -0,0 +1,161 @@
+# This file is part of prompt_prototype.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+
+import dataclasses
+import os
+import unittest
+
+from lsst.utils import getPackageDir
+
+from activator.config import PipelinesConfig
+from activator.visit import FannedOutVisit
+
+
+TESTDIR = os.path.abspath(os.path.dirname(__file__))
+
+
+class PipelinesConfigTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+
+ self.visit = FannedOutVisit(
+ instrument="NotACam",
+ detector=42,
+ groupId="2023-01-23T23:33:14.762",
+ nimages=2,
+ filters="k2022",
+ coordinateSystem=FannedOutVisit.CoordSys.ICRS,
+ position=[134.5454, -65.3261],
+ rotationSystem=FannedOutVisit.RotSys.SKY,
+ cameraAngle=135.0,
+ survey="TestSurvey",
+ salIndex=42,
+ scriptSalIndex=42,
+ dome=FannedOutVisit.Dome.OPEN,
+ duration=35.0,
+ totalCheckpoints=1,
+ )
+
+ def test_main_survey(self):
+ config = PipelinesConfig(
+ ' (survey="TestSurvey")=${PROMPT_PROTOTYPE_DIR}/pipelines/NotACam/ApPipe.yaml')
+ self.assertEqual(
+ config.get_pipeline_file(self.visit),
+ os.path.normpath(os.path.join(TESTDIR, "..", "pipelines", "NotACam", "ApPipe.yaml"))
+ )
+
+ def test_selection(self):
+ config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml '
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ '(survey="")=Default.yaml '
+ )
+ self.assertEqual(
+ config.get_pipeline_file(self.visit),
+ os.path.normpath(os.path.join("/etc", "pipelines", "SingleFrame.yaml"))
+ )
+ self.assertEqual(
+ config.get_pipeline_file(dataclasses.replace(self.visit, survey="CameraTest")),
+ os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipelines", "Isr.yaml"))
+ )
+ self.assertEqual(
+ config.get_pipeline_file(dataclasses.replace(self.visit, survey="")),
+ "Default.yaml"
+ )
+
+ def test_multiline(self):
+ config = PipelinesConfig('''(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml
+ (survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml
+ '''
+ )
+ self.assertEqual(
+ config.get_pipeline_file(self.visit),
+ os.path.normpath(os.path.join("/etc", "pipelines", "SingleFrame.yaml"))
+ )
+ self.assertEqual(
+ config.get_pipeline_file(dataclasses.replace(self.visit, survey="CameraTest")),
+ os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipelines", "Isr.yaml"))
+ )
+
+ def test_space(self):
+ config = PipelinesConfig('(survey="TestSurvey")=/dir with space/pipelines/SingleFrame.yaml '
+ '(survey="Camera Test")=${AP_PIPE_DIR}/pipe lines/Isr.yaml '
+ )
+ self.assertEqual(
+ config.get_pipeline_file(self.visit),
+ os.path.normpath(os.path.join("/dir with space", "pipelines", "SingleFrame.yaml"))
+ )
+ self.assertEqual(
+ config.get_pipeline_file(dataclasses.replace(self.visit, survey="Camera Test")),
+ os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipe lines", "Isr.yaml"))
+ )
+
+ def test_nomatch(self):
+ config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml '
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ )
+ with self.assertRaises(RuntimeError):
+ config.get_pipeline_file(dataclasses.replace(self.visit, survey="Surprise"))
+
+ def test_empty(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('')
+ with self.assertRaises(ValueError):
+ PipelinesConfig(None)
+
+ def test_commas(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml, '
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ )
+ with self.assertRaises(ValueError):
+ PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml,'
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ )
+
+ def test_unlabeled(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml, '
+ '("CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ )
+
+ def test_oddlabel(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('(reason="TestSurvey")=/etc/pipelines/SingleFrame.yaml')
+
+ def test_nospace(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml'
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml'
+ )
+ with self.assertRaises(ValueError):
+ PipelinesConfig('/etc/pipelines/SingleFrame.yaml'
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml'
+ )
+
+ def test_noequal(self):
+ with self.assertRaises(ValueError):
+ PipelinesConfig('/etc/pipelines/SingleFrame.yaml')
+
+ with self.assertRaises(ValueError):
+ PipelinesConfig('/etc/pipelines/SingleFrame.yaml '
+ '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
+ )
diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py
index d7f2c025..05f63b64 100644
--- a/tests/test_middleware_interface.py
+++ b/tests/test_middleware_interface.py
@@ -40,6 +40,7 @@
from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData
import lsst.resources
+from activator.config import PipelinesConfig
from activator.visit import FannedOutVisit
from activator.middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface, \
_filter_datasets, _prepend_collection, _remove_from_chain, _MissingDatasetError
@@ -50,6 +51,8 @@
filter = "g DECam SDSS c0001 4720.0 1520.0"
# The skymap name used in the test repo.
skymap_name = "deepCoadd_skyMap"
+# A pipelines config that returns the test pipeline.
+pipelines = PipelinesConfig('(survey="SURVEY")=${PROMPT_PROTOTYPE_DIR}/tests/data/ApPipe.yaml')
def fake_file_data(filename, dimensions, instrument, visit):
@@ -156,7 +159,7 @@ def setUp(self):
)
self.logger_name = "lsst.activator.middleware_interface"
self.interface = MiddlewareInterface(self.central_butler, self.input_data, self.next_visit,
- skymap_name, self.local_repo.name,
+ pipelines, skymap_name, self.local_repo.name,
prefix="file://")
def tearDown(self):
@@ -278,7 +281,7 @@ def test_prep_butler_twice(self):
# Second visit with everything same except group.
second_visit = dataclasses.replace(self.next_visit, groupId=str(int(self.next_visit.groupId) + 1))
second_interface = MiddlewareInterface(self.central_butler, self.input_data, second_visit,
- skymap_name, self.local_repo.name,
+ pipelines, skymap_name, self.local_repo.name,
prefix="file://")
second_interface.prep_butler()
@@ -295,7 +298,7 @@ def test_prep_butler_twice(self):
self.next_visit.position[1] - 1.2],
)
third_interface = MiddlewareInterface(self.central_butler, self.input_data, third_visit,
- skymap_name, self.local_repo.name,
+ pipelines, skymap_name, self.local_repo.name,
prefix="file://")
third_interface.prep_butler()
expected_shards.update({157393, 157395})
@@ -670,7 +673,7 @@ def setUp(self):
# Populate repository.
self.interface = MiddlewareInterface(central_butler, self.input_data, self.next_visit,
- skymap_name, local_repo.name,
+ pipelines, skymap_name, local_repo.name,
prefix="file://")
self.interface.prep_butler()
filename = "fakeRawImage.fits"
@@ -686,7 +689,7 @@ def setUp(self):
self.interface.instrument,
self.second_visit)
self.second_interface = MiddlewareInterface(central_butler, self.input_data, self.second_visit,
- skymap_name, second_local_repo.name,
+ pipelines, skymap_name, second_local_repo.name,
prefix="file://")
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
diff --git a/tests/test_tester_utils.py b/tests/test_tester_utils.py
index 8849aa51..3a5b209e 100644
--- a/tests/test_tester_utils.py
+++ b/tests/test_tester_utils.py
@@ -27,7 +27,7 @@
from moto import mock_s3
import lsst.daf.butler.tests as butler_tests
-from lsst.obs.base import ExposureIdInfo
+import lsst.meas.base
from lsst.obs.subaru import HyperSuprimeCam
from activator.raw import get_raw_path
@@ -84,7 +84,7 @@ def test_get_last_group(self):
def test_exposure_id_hsc(self):
group = "2023011100026"
- # Need a Butler registry to test ExposureIdInfo
+ # Need a Butler registry to test IdGenerator
with tempfile.TemporaryDirectory() as repo:
butler = butler_tests.makeTestRepo(repo)
HyperSuprimeCam().register(butler.registry)
@@ -101,8 +101,9 @@ def test_exposure_id_hsc(self):
# Above assertion passes if exp_id has 9+ digits, but such IDs aren't valid.
self.assertEqual(len(str_exp_id[4:]), 8)
self.assertLessEqual(exp_id, exp_max)
- # test that ExposureIdInfo.fromDataID does not raise
- ExposureIdInfo.fromDataId(data_id, "visit_detector")
+ # test that IdGenerator.unpacker_from_config does not raise
+ config = lsst.meas.base.DetectorVisitIdGeneratorConfig()
+ lsst.meas.base.IdGenerator.unpacker_from_config(config, data_id)
def test_exposure_id_hsc_limits(self):
# Confirm that the exposure ID generator works as long as advertised: