diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index c3b8e111..0b4756f4 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -35,7 +35,8 @@ from lsst.resources import ResourcePath import lsst.afw.cameraGeom -from lsst.ctrl.mpexec import SeparablePipelineExecutor +import lsst.ctrl.mpexec +from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor from lsst.daf.butler import Butler, CollectionType import lsst.dax.apdb import lsst.geom @@ -776,6 +777,36 @@ def ingest_image(self, oid: str) -> None: assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) + def _get_graph_executor(self, butler, factory): + """Create a QuantumGraphExecutor suitable for Prompt Processing. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The Butler for which the quantum graph will be generated + and executed. Should match the Butler passed to + SeparablePipelineExecutor. + factory : `lsst.pipe.base.TaskFactory` + The task factory used for pipeline execution. Should match + the factory passed to SeparablePipelineExecutor. + + Returns + ------- + executor : `lsst.ctrl.mpexec.QuantumGraphExecutor` + The executor to use. + """ + quantum_executor = SingleQuantumExecutor( + butler, + factory, + ) + graph_executor = MPGraphExecutor( + # TODO: re-enable parallel execution once we can log as desired with CliLog or a successor + numProc=1, # Avoid spawning processes, because they bypass our logger + timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days. + quantumExecutor=quantum_executor, + ) + return graph_executor + def run_pipeline(self, exposure_ids: set[int]) -> None: """Process the received image(s). @@ -818,12 +849,15 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: raise RuntimeError from e init_output_run = self._get_init_output_run(pipeline_file, self._day_obs) output_run = self._get_output_run(pipeline_file, self._day_obs) + exec_butler = Butler(butler=self.butler, + collections=[output_run, init_output_run] + list(self.butler.collections), + run=output_run) + factory = lsst.ctrl.mpexec.TaskFactory() executor = SeparablePipelineExecutor( - Butler(butler=self.butler, - collections=[output_run, init_output_run] + list(self.butler.collections), - run=output_run), + exec_butler, clobber_output=False, - skip_existing_in=None + skip_existing_in=None, + task_factory=factory, ) qgraph = executor.make_quantum_graph(pipeline, where=where) if len(qgraph) == 0: @@ -840,7 +874,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True) _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") try: - executor.run_pipeline(qgraph) + executor.run_pipeline(qgraph, graph_executor=self._get_graph_executor(exec_butler, factory)) _log.info("Pipeline successfully run.") except Exception as e: state_changed = True # better safe than sorry