Skip to content
4 changes: 4 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import confluent_kafka as kafka
from flask import Flask, request

from .config import PipelinesConfig
from .logger import setup_usdf_logger
from .make_pgpass import make_pgpass
from .middleware_interface import get_central_butler, make_local_repo, MiddlewareInterface
Expand Down Expand Up @@ -66,6 +67,8 @@
kafka_group_id = str(uuid.uuid4())
# The topic on which to listen to updates to image_bucket
bucket_topic = os.environ.get("BUCKET_TOPIC", "rubin-prompt-processing")
# The pipelines to execute and the conditions in which to choose them.
pipelines = PipelinesConfig(os.environ["PIPELINES_CONFIG"])

setup_usdf_logger(
labels={"instrument": instrument_name},
Expand Down Expand Up @@ -245,6 +248,7 @@ def next_visit_handler() -> Tuple[str, int]:
mwi = MiddlewareInterface(central_butler,
image_bucket,
expected_visit,
pipelines,
skymap,
local_repo.name)
# Copy calibrations for this detector/visit
Expand Down
137 changes: 137 additions & 0 deletions python/activator/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


__all__ = ["PipelinesConfig"]


import collections.abc
import os
import re

from .visit import FannedOutVisit


class PipelinesConfig:
"""A pipeline configuration for the Prompt Processing service.

This class provides the execution framework with a simple interface for
identifying the pipeline to execute. It attempts to abstract the details of
which factors affect the choice of pipeline to make it easier to add new
features in the future.

Parameters
----------
config : `str`
A string describing pipeline selection criteria. The current format is
a space-delimited list of mappings, each of which has the format
``(survey="<survey>")=<pipeline>``. The pipeline path may contain
environment variables. No key or value may contain the "=" character.
See examples below.

Notes
-----
While it is not expected that there will ever be more than one
PipelinesConfig instance in a program's lifetime, this class is *not* a
singleton and objects must be passed explicitly to the code that
needs them.

Examples
--------
A single-survey config:

>>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml') # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>

A config with multiple surveys, and environment variables:

>>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/ApPipe.yaml '
... '(survey="Camera Test")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml '
... '(survey="")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml ')
... # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>
"""

def __init__(self, config: str):
if not config:
raise ValueError("Must configure at least one pipeline.")

self._mapping = self._parse_config(config)

@staticmethod
def _parse_config(config: str) -> collections.abc.Mapping:
"""Turn a config string into structured config information.

Parameters
----------
config : `str`
A string describing pipeline selection criteria. The current format
is a space-delimited list of mappings, each of which has the format
'(survey="<survey>")=<pipeline>'. The pipeline path may contain
environment variables. No key or value may contain the "="
character.

Returns
-------
config : mapping [`str`, `str`]
A mapping from the survey type to the pipeline to run for that
survey. A more complex key or container type may be needed in the
future, if other pipeline selection criteria are added.

Raises
------
ValueError
Raised if the string cannot be parsed.
"""
# Use regex instead of str.split, in case keys or values also have spaces.
node = re.compile(r'\s*\(survey="(?P<survey>[\w\s]*)"\)='
r'(?P<filename>[-\w./${} ]*[-\w./${}])(?:\s+|$)')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably just a matter of taste. Would r'(?P<filename>[^=,]*[^=, ])(?:\s+|$)')
be slightly easier to read and still correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that before, actually, but then got worried that that might match some unexpected (and possibly malicious) string. I figured it's best to err on the side of being too restrictive.

I agree that ^=, would be easier to read.


items = {}
pos = 0
match = node.match(config, pos)
while match:
items[match['survey']] = match['filename']

pos = match.end()
match = node.match(config, pos)
if pos != len(config):
raise ValueError(f"Unexpected text at position {pos}: '{config[pos:]}'.")

return items

def get_pipeline_file(self, visit: FannedOutVisit) -> str:
"""Identify the pipeline to be run, based on the provided visit.

Parameters
----------
visit : `activator.visit.FannedOutVisit`
The visit for which a pipeline must be selected.

Returns
-------
pipeline : `str`
A path to a configured pipeline file.
"""
try:
return os.path.expandvars(self._mapping[visit.survey])
except KeyError as e:
raise RuntimeError(f"Unsupported survey: {visit.survey}") from e
19 changes: 8 additions & 11 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import tempfile
import typing

from lsst.utils import getPackageDir
from lsst.resources import ResourcePath
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
Expand All @@ -41,6 +40,7 @@
import lsst.obs.base
import lsst.pipe.base

from .config import PipelinesConfig
from .visit import FannedOutVisit

_log = logging.getLogger("lsst." + __name__)
Expand Down Expand Up @@ -156,6 +156,8 @@ class MiddlewareInterface:
See also ``prefix``.
visit : `activator.visit.FannedOutVisit`
The visit-detector combination to be processed by this object.
pipelines : `activator.config.PipelinesConfig`
Information about which pipelines to run on ``visit``.
skymap: `str`
Name of the skymap in the central repo for querying templates.
local_repo : `str`
Expand Down Expand Up @@ -194,7 +196,7 @@ class MiddlewareInterface:
# of self.output_collection.

def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVisit,
skymap: str, local_repo: str,
pipelines: PipelinesConfig, skymap: str, local_repo: str,
prefix: str = "s3://"):
self.visit = visit
if self.visit.coordinateSystem != FannedOutVisit.CoordSys.ICRS:
Expand All @@ -217,6 +219,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, visit: FannedOutVi
self._download_store = None
# TODO: how much overhead do we pick up from going through the registry?
self.instrument = lsst.obs.base.Instrument.from_string(visit.instrument, central_butler.registry)
self.pipelines = pipelines

self.output_collection = self.instrument.makeCollectionName("prompt")
self.init_output_run = self._get_init_output_run()
Expand Down Expand Up @@ -632,13 +635,7 @@ def _get_pipeline_file(self) -> str:
pipeline : `str`
A path to a configured pipeline file.
"""
# TODO: We hacked the basepath in the Dockerfile so this works both in
# development and in service container, but it would be better if there
# were a path that's valid in both.
return os.path.join(getPackageDir("prompt_prototype"),
"pipelines",
self.visit.instrument,
"ApPipe.yaml")
return self.pipelines.get_pipeline_file(self.visit)

def _prep_pipeline(self) -> lsst.pipe.base.Pipeline:
"""Setup the pipeline to be run, based on the configured instrument and
Expand All @@ -658,8 +655,8 @@ def _prep_pipeline(self) -> lsst.pipe.base.Pipeline:
ap_pipeline_file = self._get_pipeline_file()
try:
pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file)
except FileNotFoundError:
raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}")
except FileNotFoundError as e:
raise RuntimeError from e

try:
pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri)
Expand Down
File renamed without changes.
161 changes: 161 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# This file is part of prompt_prototype.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.


import dataclasses
import os
import unittest

from lsst.utils import getPackageDir

from activator.config import PipelinesConfig
from activator.visit import FannedOutVisit


TESTDIR = os.path.abspath(os.path.dirname(__file__))


class PipelinesConfigTest(unittest.TestCase):
def setUp(self):
super().setUp()

self.visit = FannedOutVisit(
instrument="NotACam",
detector=42,
groupId="2023-01-23T23:33:14.762",
nimages=2,
filters="k2022",
coordinateSystem=FannedOutVisit.CoordSys.ICRS,
position=[134.5454, -65.3261],
rotationSystem=FannedOutVisit.RotSys.SKY,
cameraAngle=135.0,
survey="TestSurvey",
salIndex=42,
scriptSalIndex=42,
dome=FannedOutVisit.Dome.OPEN,
duration=35.0,
totalCheckpoints=1,
)

def test_main_survey(self):
config = PipelinesConfig(
' (survey="TestSurvey")=${PROMPT_PROTOTYPE_DIR}/pipelines/NotACam/ApPipe.yaml')
self.assertEqual(
config.get_pipeline_file(self.visit),
os.path.normpath(os.path.join(TESTDIR, "..", "pipelines", "NotACam", "ApPipe.yaml"))
)

def test_selection(self):
config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml '
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
'(survey="")=Default.yaml '
)
self.assertEqual(
config.get_pipeline_file(self.visit),
os.path.normpath(os.path.join("/etc", "pipelines", "SingleFrame.yaml"))
)
self.assertEqual(
config.get_pipeline_file(dataclasses.replace(self.visit, survey="CameraTest")),
os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipelines", "Isr.yaml"))
)
self.assertEqual(
config.get_pipeline_file(dataclasses.replace(self.visit, survey="")),
"Default.yaml"
)

def test_multiline(self):
config = PipelinesConfig('''(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml
(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml
'''
)
self.assertEqual(
config.get_pipeline_file(self.visit),
os.path.normpath(os.path.join("/etc", "pipelines", "SingleFrame.yaml"))
)
self.assertEqual(
config.get_pipeline_file(dataclasses.replace(self.visit, survey="CameraTest")),
os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipelines", "Isr.yaml"))
)

def test_space(self):
config = PipelinesConfig('(survey="TestSurvey")=/dir with space/pipelines/SingleFrame.yaml '
'(survey="Camera Test")=${AP_PIPE_DIR}/pipe lines/Isr.yaml '
)
self.assertEqual(
config.get_pipeline_file(self.visit),
os.path.normpath(os.path.join("/dir with space", "pipelines", "SingleFrame.yaml"))
)
self.assertEqual(
config.get_pipeline_file(dataclasses.replace(self.visit, survey="Camera Test")),
os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipe lines", "Isr.yaml"))
)

def test_nomatch(self):
config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml '
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
)
with self.assertRaises(RuntimeError):
config.get_pipeline_file(dataclasses.replace(self.visit, survey="Surprise"))

def test_empty(self):
with self.assertRaises(ValueError):
PipelinesConfig('')
with self.assertRaises(ValueError):
PipelinesConfig(None)

def test_commas(self):
with self.assertRaises(ValueError):
PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml, '
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
)
with self.assertRaises(ValueError):
PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml,'
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
)

def test_unlabeled(self):
with self.assertRaises(ValueError):
PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml, '
'("CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
)

def test_oddlabel(self):
with self.assertRaises(ValueError):
PipelinesConfig('(reason="TestSurvey")=/etc/pipelines/SingleFrame.yaml')

def test_nospace(self):
with self.assertRaises(ValueError):
PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml'
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml'
)
with self.assertRaises(ValueError):
PipelinesConfig('/etc/pipelines/SingleFrame.yaml'
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml'
)

def test_noequal(self):
with self.assertRaises(ValueError):
PipelinesConfig('/etc/pipelines/SingleFrame.yaml')

with self.assertRaises(ValueError):
PipelinesConfig('/etc/pipelines/SingleFrame.yaml '
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
)
Loading