Skip to content

Commit 75ee212

Browse files
committed
Merge branch 'tickets/DM-47829'
2 parents ea47e0b + 189a4eb commit 75ee212

File tree

2 files changed

+113
-21
lines changed

2 files changed

+113
-21
lines changed

python/activator/middleware_interface.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,9 @@ def _check_permanent_changes(self, where: str) -> bool:
13671367
data_id = data_ids.pop()
13681368
apdb = lsst.dax.apdb.Apdb.from_uri(self._apdb_config)
13691369
return apdb.containsVisitDetector(data_id["visit"], self.visit.detector)
1370+
elif not data_ids:
1371+
# Engineering exposures don't produce visits, but they also can't write to the APDB.
1372+
return False
13701373
else:
13711374
# Don't know how this could happen, so won't try to handle it gracefully.
13721375
_log.warning("Unexpected visit ids: %s. Assuming APDB modified.", data_ids)

tests/test_middleware_interface.py

Lines changed: 110 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
from activator.caching import DatasetCache
5353
from activator.config import PipelinesConfig
54-
from activator.exception import NonRetriableError
54+
from activator.exception import NonRetriableError, NoGoodPipelinesError, PipelineExecutionError
5555
from activator.visit import FannedOutVisit
5656
from activator.middleware_interface import get_central_butler, flush_local_repo, make_local_repo, \
5757
_get_sasquatch_dispatcher, MiddlewareInterface, \
@@ -71,6 +71,9 @@
7171
"${PROMPT_PROCESSING_DIR}/tests/data/SingleFrame.yaml",
7272
],
7373
}])
74+
pipelines_minimal = PipelinesConfig([{"survey": "SURVEY",
75+
"pipelines": ["${PROMPT_PROCESSING_DIR}/tests/data/ISR.yaml", ],
76+
}])
7477
pre_pipelines_empty = PipelinesConfig([{"survey": "SURVEY", "pipelines": None}])
7578
pre_pipelines_full = PipelinesConfig([{"survey": "SURVEY",
7679
"pipelines": ["${PROMPT_PROCESSING_DIR}/tests/data/Preprocess.yaml",
@@ -132,6 +135,60 @@ def fake_file_data(filename, dimensions, instrument, visit):
132135
return data_id, file_data
133136

134137

138+
# TODO: merge this into fake_file_data after DM-46152
139+
def fake_eng_data(filename, dimensions, instrument, visit):
140+
"""Return file data for a mock non-science file to be ingested.
141+
142+
Parameters
143+
----------
144+
filename : `str`
145+
Full path to the file to mock. Can be a non-existant file.
146+
dimensions : `lsst.daf.butler.DimensionsUniverse`
147+
The full set of dimensions for this butler.
148+
instrument : `lsst.obs.base.Instrument`
149+
The instrument the file is supposed to be from.
150+
visit : `FannedOutVisit`
151+
Group of snaps from one detector to be processed.
152+
153+
Returns
154+
-------
155+
data_id, file_data, : `DataCoordinate`, `RawFileData`
156+
The id and descriptor for the mock file.
157+
"""
158+
exposure_id = int(visit.groupId)
159+
data_id = DataCoordinate.standardize({"exposure": exposure_id,
160+
"detector": visit.detector,
161+
"instrument": instrument.getName()},
162+
universe=dimensions)
163+
164+
start_time = astropy.time.Time("2024-06-17T22:06:15", scale="tai")
165+
day_obs = 20240617
166+
obs_info = astro_metadata_translator.makeObservationInfo(
167+
instrument=instrument.getName(),
168+
datetime_begin=start_time,
169+
datetime_end=start_time + 30*u.second,
170+
exposure_id=exposure_id,
171+
exposure_group=visit.groupId,
172+
visit_id=None,
173+
boresight_rotation_angle=None,
174+
boresight_rotation_coord=None,
175+
tracking_radec=None,
176+
observation_id=visit.groupId,
177+
physical_filter=filter,
178+
exposure_time=30.0*u.second,
179+
observation_type="goofing off",
180+
observing_day=day_obs,
181+
group_counter_start=exposure_id,
182+
group_counter_end=exposure_id,
183+
)
184+
dataset_info = RawFileDatasetInfo(data_id, obs_info)
185+
file_data = RawFileData([dataset_info],
186+
lsst.resources.ResourcePath(filename),
187+
FitsImageFormatter,
188+
instrument)
189+
return data_id, file_data
190+
191+
135192
class MiddlewareInterfaceTest(unittest.TestCase):
136193
"""Test the MiddlewareInterface class with faked data.
137194
"""
@@ -360,7 +417,7 @@ def test_prep_butler_nofilter(self):
360417
def test_prep_butler_notemplates(self):
361418
"""Test that prep_butler can handle pipeline configs without templates.
362419
"""
363-
self.interface.main_pipelines = pre_pipelines_empty
420+
self.interface.main_pipelines = pipelines_minimal
364421
with unittest.mock.patch("activator.middleware_interface.MiddlewareInterface._run_preprocessing") \
365422
as mock_pre, \
366423
self.assertNoLogs(level="ERROR"):
@@ -475,10 +532,10 @@ def test_ingest_image_fails_missing_file(self):
475532
self.interface.prep_butler() # Ensure raw collections exist.
476533
filename = "nonexistentImage.fits"
477534
filepath = os.path.join(self.input_data, filename)
478-
data_id, file_data = fake_file_data(filepath,
479-
self.interface.butler.dimensions,
480-
self.interface.instrument,
481-
self.next_visit)
535+
_, file_data = fake_file_data(filepath,
536+
self.interface.butler.dimensions,
537+
self.interface.instrument,
538+
self.next_visit)
482539
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock, \
483540
self.assertRaisesRegex(FileNotFoundError, "Resource at .* does not exist"):
484541
mock.return_value = file_data
@@ -498,14 +555,29 @@ def _prepare_run_pipeline(self):
498555

499556
filename = "fakeRawImage.fits"
500557
filepath = os.path.join(self.input_data, filename)
501-
data_id, file_data = fake_file_data(filepath,
502-
self.interface.butler.dimensions,
503-
self.interface.instrument,
504-
self.next_visit)
558+
_, file_data = fake_file_data(filepath,
559+
self.interface.butler.dimensions,
560+
self.interface.instrument,
561+
self.next_visit)
505562
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
506563
mock.return_value = file_data
507564
self.interface.ingest_image(filename)
508-
565+
# Dummy "engineering" visit to test non-science handling
566+
eng_visit = dataclasses.replace(self.next_visit,
567+
groupId="42",
568+
coordinateSystem=FannedOutVisit.CoordSys.NONE,
569+
position=[0.0, 0.0],
570+
rotationSystem=FannedOutVisit.RotSys.NONE,
571+
cameraAngle=0.0,
572+
dome=FannedOutVisit.Dome.CLOSED,
573+
)
574+
_, eng_data = fake_eng_data(filepath,
575+
self.interface.butler.dimensions,
576+
self.interface.instrument,
577+
eng_visit)
578+
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
579+
mock.return_value = eng_data
580+
self.interface.ingest_image(filename)
509581
# TODO: add any preprocessing outputs the main pipeline depends on (DM-43418?)
510582

511583
def test_run_pipeline(self):
@@ -594,7 +666,7 @@ def test_run_pipeline_fallback_2failof2(self):
594666
expected = ""
595667

596668
self._prepare_run_pipeline()
597-
with self.assertRaises(RuntimeError):
669+
with self.assertRaises(NoGoodPipelinesError):
598670
self._check_run_pipeline_fallback(lambda: self.interface.run_pipeline({1}),
599671
pipe_list, graph_list, expected)
600672

@@ -650,10 +722,10 @@ def test_run_pipeline_bad_visits(self):
650722
self.interface.prep_butler()
651723
filename = "fakeRawImage.fits"
652724
filepath = os.path.join(self.input_data, filename)
653-
data_id, file_data = fake_file_data(filepath,
654-
self.interface.butler.dimensions,
655-
self.interface.instrument,
656-
self.next_visit)
725+
_, file_data = fake_file_data(filepath,
726+
self.interface.butler.dimensions,
727+
self.interface.instrument,
728+
self.next_visit)
657729
with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
658730
mock.return_value = file_data
659731
self.interface.ingest_image(filename)
@@ -671,9 +743,9 @@ def test_run_pipeline_early_exception(self):
671743
unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
672744
as mock_run, \
673745
unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsVisitDetector") as mock_query:
674-
mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
746+
mock_run.side_effect = ValueError("Error: not computable")
675747
mock_query.return_value = False
676-
with self.assertRaises(RuntimeError):
748+
with self.assertRaises(PipelineExecutionError):
677749
self.interface.run_pipeline({1})
678750

679751
def test_run_pipeline_late_exception(self):
@@ -686,7 +758,7 @@ def test_run_pipeline_late_exception(self):
686758
unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
687759
as mock_run, \
688760
unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsVisitDetector") as mock_query:
689-
mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
761+
mock_run.side_effect = ValueError("Error: not computable")
690762
mock_query.return_value = True
691763
with self.assertRaises(NonRetriableError):
692764
self.interface.run_pipeline({1})
@@ -701,11 +773,28 @@ def test_run_pipeline_cascading_exception(self):
701773
unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
702774
as mock_run, \
703775
unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsVisitDetector") as mock_query:
704-
mock_run.side_effect = RuntimeError("The pipeline doesn't like you.")
776+
mock_run.side_effect = ValueError("Error: not computable")
705777
mock_query.side_effect = psycopg2.OperationalError("Database? What database?")
706778
with self.assertRaises(NonRetriableError):
707779
self.interface.run_pipeline({1})
708780

781+
def test_run_pipeline_early_exception_novisit(self):
782+
"""Test behavior when execution fails in ISR-only processing.
783+
"""
784+
self._prepare_run_pipeline()
785+
786+
with unittest.mock.patch(
787+
"activator.middleware_interface.SeparablePipelineExecutor.pre_execute_qgraph"), \
788+
unittest.mock.patch("activator.middleware_interface.SeparablePipelineExecutor.run_pipeline") \
789+
as mock_run, \
790+
unittest.mock.patch.object(self.interface, "main_pipelines", pipelines_minimal), \
791+
unittest.mock.patch("lsst.dax.apdb.ApdbSql.containsVisitDetector") as mock_query:
792+
mock_run.side_effect = ValueError("Error: not computable")
793+
mock_query.return_value = False
794+
with self.assertRaises(PipelineExecutionError):
795+
# Engineering run; see _prepare_run_pipeline
796+
self.interface.run_pipeline({42})
797+
709798
def test_run_preprocessing_empty(self):
710799
"""Test that running the preprocessiing pipeline does nothing if no
711800
pipelines configured.
@@ -767,7 +856,7 @@ def test_run_preprocessing_fallback_2failof2(self):
767856
expected = ""
768857

769858
self._prepare_run_preprocessing()
770-
with self.assertRaises(RuntimeError):
859+
with self.assertRaises(NoGoodPipelinesError):
771860
self._check_run_pipeline_fallback(self.interface._run_preprocessing,
772861
pipe_list, graph_list, expected)
773862

0 commit comments

Comments
 (0)