From 8540f1fdd9087899defc1faf3d6c1be97aa403ea Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 6 Dec 2023 15:34:19 -0800 Subject: [PATCH 1/3] Factor out pipeline execution Butler. Assigning the temporary Butler to a local variable makes it usable at other pipeline execution steps. --- python/activator/middleware_interface.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index c3b8e111..0c573e64 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -818,12 +818,13 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: raise RuntimeError from e init_output_run = self._get_init_output_run(pipeline_file, self._day_obs) output_run = self._get_output_run(pipeline_file, self._day_obs) + exec_butler = Butler(butler=self.butler, + collections=[output_run, init_output_run] + list(self.butler.collections), + run=output_run) executor = SeparablePipelineExecutor( - Butler(butler=self.butler, - collections=[output_run, init_output_run] + list(self.butler.collections), - run=output_run), + exec_butler, clobber_output=False, - skip_existing_in=None + skip_existing_in=None, ) qgraph = executor.make_quantum_graph(pipeline, where=where) if len(qgraph) == 0: From 2bcc0d3d847dd4435133087e3379f27eafb36698 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 6 Dec 2023 15:38:53 -0800 Subject: [PATCH 2/3] Explicitly construct a task factory for pipeline execution. This factory can be shared among multiple components. The explicit namespace is to disambiguate ctrl.mpexec.TaskFactory from pipe.base.TaskFactory. --- python/activator/middleware_interface.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 0c573e64..7a2658f2 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -35,6 +35,7 @@ from lsst.resources import ResourcePath import lsst.afw.cameraGeom +import lsst.ctrl.mpexec from lsst.ctrl.mpexec import SeparablePipelineExecutor from lsst.daf.butler import Butler, CollectionType import lsst.dax.apdb @@ -821,10 +822,12 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: exec_butler = Butler(butler=self.butler, collections=[output_run, init_output_run] + list(self.butler.collections), run=output_run) + factory = lsst.ctrl.mpexec.TaskFactory() executor = SeparablePipelineExecutor( exec_butler, clobber_output=False, skip_existing_in=None, + task_factory=factory, ) qgraph = executor.make_quantum_graph(pipeline, where=where) if len(qgraph) == 0: From 8eac6b09714276400f6025e9d703c3f72ca31ce4 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 6 Dec 2023 13:53:12 -0800 Subject: [PATCH 3/3] Use custom configuration of MPGraphExecutor. This config never uses multiprocessing, avoiding problems associated with spawning. --- python/activator/middleware_interface.py | 35 ++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 7a2658f2..511dc23f 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -36,7 +36,7 @@ from lsst.resources import ResourcePath import lsst.afw.cameraGeom import lsst.ctrl.mpexec -from lsst.ctrl.mpexec import SeparablePipelineExecutor +from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor from lsst.daf.butler import Butler, CollectionType import lsst.dax.apdb import lsst.geom @@ -777,6 +777,37 @@ def ingest_image(self, oid: str) -> None: assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) + def _get_graph_executor(self, butler, factory): + """Create a QuantumGraphExecutor suitable for Prompt Processing. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The Butler for which the quantum graph will be generated + and executed. Should match the Butler passed to + SeparablePipelineExecutor. + factory : `lsst.pipe.base.TaskFactory` + The task factory used for pipeline execution. Should match + the factory passed to SeparablePipelineExecutor. + + Returns + ------- + executor : `lsst.ctrl.mpexec.QuantumGraphExecutor` + The executor to use. + """ + quantum_executor = SingleQuantumExecutor( + butler, + factory, + ) + graph_executor = MPGraphExecutor( + # TODO: re-enable parallel execution once we can log as desired with CliLog or a successor + # (see issues linked from DM-42063) + numProc=1, # Avoid spawning processes, because they bypass our logger + timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days. + quantumExecutor=quantum_executor, + ) + return graph_executor + def run_pipeline(self, exposure_ids: set[int]) -> None: """Process the received image(s). @@ -844,7 +875,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") try: - executor.run_pipeline(qgraph) + executor.run_pipeline(qgraph, graph_executor=self._get_graph_executor(exec_butler, factory)) _log.info("Pipeline successfully run.") except Exception as e: state_changed = True # better safe than sorry