From fc4930943c978c8b6c33865eacba85a9bebd220e Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Sun, 27 Jul 2025 10:52:17 -0400 Subject: [PATCH] Update ctrl_mpexec imports to pipe_base. --- python/activator/middleware_interface.py | 19 ++++++++++--------- ups/prompt_processing.table | 1 - 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 6747b151..ff58eb6d 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -41,8 +41,9 @@ from lsst.resources import ResourcePath import lsst.sphgeom import lsst.afw.cameraGeom -import lsst.ctrl.mpexec -from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor +from lsst.pipe.base.mp_graph_executor import MPGraphExecutor +from lsst.pipe.base.separable_pipeline_executor import SeparablePipelineExecutor +from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor from lsst.daf.butler import Butler, CollectionType, DatasetType, Timespan, \ DataIdValueError, MissingDatasetTypeError, MissingCollectionError import lsst.dax.apdb @@ -1229,21 +1230,21 @@ def _get_graph_executor(self, butler, factory): Returns ------- - executor : `lsst.ctrl.mpexec.QuantumGraphExecutor` + executor : `lsst.pipe.base.quantum_graph_executor.QuantumGraphExecutor` The executor to use. """ quantum_executor = SingleQuantumExecutor( - butler, - factory, - assumeNoExistingOutputs=True, # Outputs cleared out on success *or* failure + butler=butler, + task_factory=factory, + assume_no_existing_outputs=True, # Outputs cleared out on success *or* failure raise_on_partial_outputs=True, # Only way to detect that partial outputs happened ) graph_executor = MPGraphExecutor( # TODO: re-enable parallel execution once we can log as desired with CliLog or a successor # (see issues linked from DM-42063) - numProc=1, # Avoid spawning processes, because they bypass our logger + num_proc=1, # Avoid spawning processes, because they bypass our logger timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days. - quantumExecutor=quantum_executor, + quantum_executor=quantum_executor, ) return graph_executor @@ -1297,7 +1298,7 @@ def _try_pipelines(self, pipelines, in_collections, data_ids, *, label): + in_collections + list(self.butler.collections.defaults), run=output_run) - factory = lsst.ctrl.mpexec.TaskFactory() + factory = lsst.pipe.base.TaskFactory() executor = SeparablePipelineExecutor( exec_butler, clobber_output=False, diff --git a/ups/prompt_processing.table b/ups/prompt_processing.table index f8ba9a43..3e9008f8 100644 --- a/ups/prompt_processing.table +++ b/ups/prompt_processing.table @@ -13,7 +13,6 @@ setupRequired(obs_lsst) # Used by middleware_interface setupRequired(analysis_tools) setupRequired(daf_butler) -setupRequired(ctrl_mpexec) setupRequired(geom) setupRequired(meas_algorithms) setupRequired(obs_base)