Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@

from lsst.resources import ResourcePath
import lsst.afw.cameraGeom
from lsst.ctrl.mpexec import SeparablePipelineExecutor
import lsst.ctrl.mpexec
from lsst.ctrl.mpexec import SeparablePipelineExecutor, SingleQuantumExecutor, MPGraphExecutor
from lsst.daf.butler import Butler, CollectionType
import lsst.dax.apdb
import lsst.geom
Expand Down Expand Up @@ -776,6 +777,37 @@ def ingest_image(self, oid: str) -> None:
assert len(result) == 1, "Should have ingested exactly one image."
_log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId)

def _get_graph_executor(self, butler, factory):
"""Create a QuantumGraphExecutor suitable for Prompt Processing.

Parameters
----------
butler : `lsst.daf.butler.Butler`
The Butler for which the quantum graph will be generated
and executed. Should match the Butler passed to
SeparablePipelineExecutor.
factory : `lsst.pipe.base.TaskFactory`
The task factory used for pipeline execution. Should match
the factory passed to SeparablePipelineExecutor.

Returns
-------
executor : `lsst.ctrl.mpexec.QuantumGraphExecutor`
The executor to use.
"""
quantum_executor = SingleQuantumExecutor(
butler,
factory,
)
graph_executor = MPGraphExecutor(
# TODO: re-enable parallel execution once we can log as desired with CliLog or a successor
# (see issues linked from DM-42063)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good workaround for the time being. Maybe we should create a new ticket for the future work discussed at Slack on more properly engineering this even if that may not happen soon?

Copy link
Member Author

@kfindeisen kfindeisen Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will need to be done piece by piece, and would prefer to avoid a Big Design Discussion that tries to solve everything at once. I hope to discuss the details with Middleware after catching up on backlog.

numProc=1, # Avoid spawning processes, because they bypass our logger
timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days.
quantumExecutor=quantum_executor,
)
return graph_executor

def run_pipeline(self, exposure_ids: set[int]) -> None:
"""Process the received image(s).

Expand Down Expand Up @@ -818,12 +850,15 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
raise RuntimeError from e
init_output_run = self._get_init_output_run(pipeline_file, self._day_obs)
output_run = self._get_output_run(pipeline_file, self._day_obs)
exec_butler = Butler(butler=self.butler,
collections=[output_run, init_output_run] + list(self.butler.collections),
run=output_run)
factory = lsst.ctrl.mpexec.TaskFactory()
executor = SeparablePipelineExecutor(
Butler(butler=self.butler,
collections=[output_run, init_output_run] + list(self.butler.collections),
run=output_run),
exec_butler,
clobber_output=False,
skip_existing_in=None
skip_existing_in=None,
task_factory=factory,
)
qgraph = executor.make_quantum_graph(pipeline, where=where)
if len(qgraph) == 0:
Expand All @@ -840,7 +875,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
executor.pre_execute_qgraph(qgraph, register_dataset_types=True, save_init_outputs=True)
_log.info(f"Running '{pipeline._pipelineIR.description}' on {where}")
try:
executor.run_pipeline(qgraph)
executor.run_pipeline(qgraph, graph_executor=self._get_graph_executor(exec_butler, factory))
_log.info("Pipeline successfully run.")
except Exception as e:
state_changed = True # better safe than sorry
Expand Down