Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@

from lsst.resources import ResourcePath
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
from lsst.daf.butler import Butler, CollectionType
import lsst.dax.apdb
import lsst.geom
Expand Down Expand Up @@ -776,6 +777,36 @@ def ingest_image(self, oid: str) -> None:
assert len(result) == 1, "Should have ingested exactly one image."
_log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId)

def _get_graph_executor(self, butler, factory):
"""Create a QuantumGraphExecutor suitable for Prompt Processing.

Parameters
----------
butler : `lsst.daf.butler.Butler`
The Butler for which the quantum graph will be generated
and executed. Should match the Butler passed to
SeparablePipelineExecutor.
factory : `lsst.pipe.base.TaskFactory`
The task factory used for pipeline execution. Should match
the factory passed to SeparablePipelineExecutor.

Returns
-------
executor : `lsst.ctrl.mpexec.QuantumGraphExecutor`
The executor to use.
"""
quantum_executor = SingleQuantumExecutor(
butler,
factory,
)
graph_executor = MPGraphExecutor(
# TODO: re-enable parallel execution once we can log as desired with CliLog or a successor
numProc=1, # Avoid spawning processes, because they bypass our logger
timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days.
quantumExecutor=quantum_executor,
)
return graph_executor

def run_pipeline(self, exposure_ids: set[int]) -> None:
"""Process the received image(s).

Expand Down Expand Up @@ -818,12 +849,15 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
raise RuntimeError from e
init_output_run = self._get_init_output_run(pipeline_file, self._day_obs)
output_run = self._get_output_run(pipeline_file, self._day_obs)
exec_butler = Butler(butler=self.butler,
collections=[output_run, init_output_run] + list(self.butler.collections),
run=output_run)
factory = lsst.ctrl.mpexec.TaskFactory()
executor = SeparablePipelineExecutor(
Butler(butler=self.butler,
collections=[output_run, init_output_run] + list(self.butler.collections),
run=output_run),
exec_butler,
clobber_output=False,
skip_existing_in=None
skip_existing_in=None,
task_factory=factory,
)
qgraph = executor.make_quantum_graph(pipeline, where=where)
if len(qgraph) == 0:
Expand All @@ -840,7 +874,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
try:
executor.run_pipeline(qgraph)
executor.run_pipeline(qgraph, graph_executor=self._get_graph_executor(exec_butler, factory))
_log.info("Pipeline successfully run.")
except Exception as e:
state_changed = True # better safe than sorry
Expand Down