From 9236958e144f8854d0b1e7cad1d6f7b51571b187 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 15 Jun 2023 13:04:32 -0700 Subject: [PATCH 1/4] Abort execution if no pipeline available. The code now tests for an applicable pipeline as soon as it parses the next_visit message. If there is no pipeline, this is treated as a semantic error by the client (i.e., the fanout service). --- python/activator/activator.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/activator/activator.py b/python/activator/activator.py index addd3b54..d5277248 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -241,6 +241,10 @@ def next_visit_handler() -> Tuple[str, int]: return f"Bad Request: {msg}", 400 assert expected_visit.instrument == instrument_name, \ f"Expected {instrument_name}, received {expected_visit.instrument}." + if pipelines.get_pipeline_file(expected_visit) is None: + _log.info(f"No pipeline configured for {expected_visit}, skipping.") + return "No pipeline configured for the received visit.", 422 + expid_set = set() # Create a fresh MiddlewareInterface object to avoid accidental From 628018285774a1197ea640eca187331837e7c280 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 15 Jun 2023 13:35:58 -0700 Subject: [PATCH 2/4] Allow PipelinesConfig to return None for a pipeline. This change makes it possible to program the Prompt Processing service to ignore visits that we either can't or don't want to process. --- python/activator/config.py | 27 ++++++++++++++++++++------- tests/test_config.py | 10 ++++++++++ 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/python/activator/config.py b/python/activator/config.py index dd38b8dd..0fd51d46 100644 --- a/python/activator/config.py +++ b/python/activator/config.py @@ -44,7 +44,8 @@ class PipelinesConfig: A string describing pipeline selection criteria. The current format is a space-delimited list of mappings, each of which has the format ``(survey="")=``. The pipeline path may contain - environment variables. No key or value may contain the "=" character. + environment variables, or may be the keyword "None" to mean no pipeline + should be run. No key or value may contain the "=" character. See examples below. Notes @@ -68,6 +69,12 @@ class PipelinesConfig: ... '(survey="")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml ') ... # doctest: +ELLIPSIS + + A config that omits a pipeline for non-sky data: + + >>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/ApPipe.yaml ' + ... '(survey="Dome Flats")=None ') # doctest: +ELLIPSIS + """ def __init__(self, config: str): @@ -86,12 +93,13 @@ def _parse_config(config: str) -> collections.abc.Mapping: A string describing pipeline selection criteria. The current format is a space-delimited list of mappings, each of which has the format '(survey="")='. The pipeline path may contain - environment variables. No key or value may contain the "=" + environment variables, or may be the keyword "None" to mean no + pipeline should be run. No key or value may contain the "=" character. Returns ------- - config : mapping [`str`, `str`] + config : mapping [`str`, `str` or `None`] A mapping from the survey type to the pipeline to run for that survey. A more complex key or container type may be needed in the future, if other pipeline selection criteria are added. @@ -109,7 +117,7 @@ def _parse_config(config: str) -> collections.abc.Mapping: pos = 0 match = node.match(config, pos) while match: - items[match['survey']] = match['filename'] + items[match['survey']] = match['filename'] if match['filename'].lower() != "none" else None pos = match.end() match = node.match(config, pos) @@ -128,10 +136,15 @@ def get_pipeline_file(self, visit: FannedOutVisit) -> str: Returns ------- - pipeline : `str` - A path to a configured pipeline file. + pipeline : `str` or `None` + A path to a configured pipeline file. A value of `None` means that + *no* pipeline should be run on this visit. """ try: - return os.path.expandvars(self._mapping[visit.survey]) + value = self._mapping[visit.survey] except KeyError as e: raise RuntimeError(f"Unsupported survey: {visit.survey}") from e + if value is not None: + return os.path.expandvars(value) + else: + return value diff --git a/tests/test_config.py b/tests/test_config.py index b5dae730..06124ab4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -108,6 +108,16 @@ def test_space(self): os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipe lines", "Isr.yaml")) ) + def test_none(self): + config = PipelinesConfig('(survey="TestSurvey")=None shall pass/pipelines/SingleFrame.yaml ' + '(survey="Camera Test")=None ' + ) + self.assertEqual( + config.get_pipeline_file(self.visit), + os.path.normpath(os.path.join("None shall pass", "pipelines", "SingleFrame.yaml")) + ) + self.assertIsNone(config.get_pipeline_file(dataclasses.replace(self.visit, survey="Camera Test"))) + def test_nomatch(self): config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml ' '(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml ' From 290071d262acbaa1b467e275b127b05e0db75ea6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 15 Jun 2023 17:39:04 -0700 Subject: [PATCH 3/4] Fix collection bug in MiddlewareInterface unit tests. The time zone used to datestamp runs was changed in #68, but the tests were never updated to match. This caused failures when running the tests between 0:00 and 12:00 UTC. --- tests/test_middleware_interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 05f63b64..438693ab 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -749,7 +749,7 @@ def test_export_outputs(self): self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - date = datetime.datetime.now(datetime.timezone.utc) + date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12))) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) @@ -778,7 +778,7 @@ def test_export_outputs_retry(self): self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - date = datetime.datetime.now(datetime.timezone.utc) + date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12))) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) From 500ccd0f6817070851b23073ca86c5d8de564d25 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Thu, 15 Jun 2023 17:40:42 -0700 Subject: [PATCH 4/4] Add survey to Visit string representation. The survey is generally more human-readable than the group ID. Since commissioning runs mix different kinds of observations (represented as different surveys), it's a valuable piece of context for understanding what kind of visit is being processed. --- python/activator/visit.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/activator/visit.py b/python/activator/visit.py index 3ab1cd14..33602dae 100644 --- a/python/activator/visit.py +++ b/python/activator/visit.py @@ -54,7 +54,7 @@ def __str__(self): """Return a short string that represents the visit but does not include complete metadata. """ - return f"(groupId={self.groupId}, salIndex={self.salIndex})" + return f"(groupId={self.groupId}, survey={self.survey}, salIndex={self.salIndex})" @dataclass(frozen=True, kw_only=True) @@ -67,7 +67,8 @@ def __str__(self): """Return a short string that disambiguates the visit but does not include "metadata" fields. """ - return f"(instrument={self.instrument}, groupId={self.groupId}, detector={self.detector})" + return f"(instrument={self.instrument}, groupId={self.groupId}, survey={self.survey} " \ + f"detector={self.detector})" def get_bare_visit(self): """Return visit-level info as a dict"""