Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ def next_visit_handler() -> Tuple[str, int]:
return f"Bad Request: {msg}", 400
assert expected_visit.instrument == instrument_name, \
f"Expected {instrument_name}, received {expected_visit.instrument}."
if pipelines.get_pipeline_file(expected_visit) is None:
_log.info(f"No pipeline configured for {expected_visit}, skipping.")
return "No pipeline configured for the received visit.", 422

expid_set = set()

# Create a fresh MiddlewareInterface object to avoid accidental
Expand Down
27 changes: 20 additions & 7 deletions python/activator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class PipelinesConfig:
A string describing pipeline selection criteria. The current format is
a space-delimited list of mappings, each of which has the format
``(survey="<survey>")=<pipeline>``. The pipeline path may contain
environment variables. No key or value may contain the "=" character.
environment variables, or may be the keyword "None" to mean no pipeline
should be run. No key or value may contain the "=" character.
See examples below.

Notes
Expand All @@ -68,6 +69,12 @@ class PipelinesConfig:
... '(survey="")=${AP_PIPE_DIR}/pipelines/LSSTComCam/Isr.yaml ')
... # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>

A config that omits a pipeline for non-sky data:

>>> PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/ApPipe.yaml '
... '(survey="Dome Flats")=None ') # doctest: +ELLIPSIS
<config.PipelinesConfig object at 0x...>
"""

def __init__(self, config: str):
Expand All @@ -86,12 +93,13 @@ def _parse_config(config: str) -> collections.abc.Mapping:
A string describing pipeline selection criteria. The current format
is a space-delimited list of mappings, each of which has the format
'(survey="<survey>")=<pipeline>'. The pipeline path may contain
environment variables. No key or value may contain the "="
environment variables, or may be the keyword "None" to mean no
pipeline should be run. No key or value may contain the "="
character.

Returns
-------
config : mapping [`str`, `str`]
config : mapping [`str`, `str` or `None`]
A mapping from the survey type to the pipeline to run for that
survey. A more complex key or container type may be needed in the
future, if other pipeline selection criteria are added.
Expand All @@ -109,7 +117,7 @@ def _parse_config(config: str) -> collections.abc.Mapping:
pos = 0
match = node.match(config, pos)
while match:
items[match['survey']] = match['filename']
items[match['survey']] = match['filename'] if match['filename'].lower() != "none" else None

pos = match.end()
match = node.match(config, pos)
Expand All @@ -128,10 +136,15 @@ def get_pipeline_file(self, visit: FannedOutVisit) -> str:

Returns
-------
pipeline : `str`
A path to a configured pipeline file.
pipeline : `str` or `None`
A path to a configured pipeline file. A value of `None` means that
*no* pipeline should be run on this visit.
"""
try:
return os.path.expandvars(self._mapping[visit.survey])
value = self._mapping[visit.survey]
except KeyError as e:
raise RuntimeError(f"Unsupported survey: {visit.survey}") from e
if value is not None:
return os.path.expandvars(value)
else:
return value
5 changes: 3 additions & 2 deletions python/activator/visit.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __str__(self):
"""Return a short string that represents the visit but does not
include complete metadata.
"""
return f"(groupId={self.groupId}, salIndex={self.salIndex})"
return f"(groupId={self.groupId}, survey={self.survey}, salIndex={self.salIndex})"


@dataclass(frozen=True, kw_only=True)
Expand All @@ -67,7 +67,8 @@ def __str__(self):
"""Return a short string that disambiguates the visit but does not
include "metadata" fields.
"""
return f"(instrument={self.instrument}, groupId={self.groupId}, detector={self.detector})"
return f"(instrument={self.instrument}, groupId={self.groupId}, survey={self.survey} " \
f"detector={self.detector})"

def get_bare_visit(self):
"""Return visit-level info as a dict"""
Expand Down
10 changes: 10 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,16 @@ def test_space(self):
os.path.normpath(os.path.join(getPackageDir("ap_pipe"), "pipe lines", "Isr.yaml"))
)

def test_none(self):
config = PipelinesConfig('(survey="TestSurvey")=None shall pass/pipelines/SingleFrame.yaml '
'(survey="Camera Test")=None '
)
self.assertEqual(
config.get_pipeline_file(self.visit),
os.path.normpath(os.path.join("None shall pass", "pipelines", "SingleFrame.yaml"))
)
self.assertIsNone(config.get_pipeline_file(dataclasses.replace(self.visit, survey="Camera Test")))

def test_nomatch(self):
config = PipelinesConfig('(survey="TestSurvey")=/etc/pipelines/SingleFrame.yaml '
'(survey="CameraTest")=${AP_PIPE_DIR}/pipelines/Isr.yaml '
Expand Down
4 changes: 2 additions & 2 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ def test_export_outputs(self):
self.second_interface.export_outputs({self.second_data_id["exposure"]})

central_butler = Butler(self.central_repo.name, writeable=False)
date = datetime.datetime.now(datetime.timezone.utc)
date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12)))
export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \
"/ApPipe/prompt-proto-service-042"
self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2)
Expand Down Expand Up @@ -778,7 +778,7 @@ def test_export_outputs_retry(self):
self.second_interface.export_outputs({self.second_data_id["exposure"]})

central_butler = Butler(self.central_repo.name, writeable=False)
date = datetime.datetime.now(datetime.timezone.utc)
date = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=-12)))
export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \
"/ApPipe/prompt-proto-service-042"
self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2)
Expand Down