Skip to content

Commit

Permalink
Merge pull request #283 from lsst/tickets/DM-42751
Browse files Browse the repository at this point in the history
DM-42751: Add num_proc parameter to SeparablePipelineExecutor.run_pipeline()
  • Loading branch information
timj committed Feb 1, 2024
2 parents 21651d4 + 5529847 commit 1ce94f1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.12.1
rev: 24.1.1
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -22,7 +22,7 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.13
rev: v0.1.15
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
2 changes: 2 additions & 0 deletions doc/changes/DM-42751.api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
``SeparablePipelineExecutor.run_pipeline`` has been modified to take a ``num_proc`` parameter to specify how many subprocesses can be used to execute the pipeline.
The default is now ``1`` (no spawning), which is a change from the previous behavior of using 80% of the available cores.
14 changes: 9 additions & 5 deletions python/lsst/ctrl/mpexec/separablePipelineExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import datetime
import getpass
import logging
import math
import multiprocessing
from collections.abc import Iterable, Mapping
from typing import Any, Protocol

Expand Down Expand Up @@ -241,6 +239,7 @@ def run_pipeline(
graph: lsst.pipe.base.QuantumGraph,
fail_fast: bool = False,
graph_executor: QuantumGraphExecutor | None = None,
num_proc: int = 1,
) -> None:
"""Run a pipeline in the form of a prepared quantum graph.
Expand All @@ -252,11 +251,16 @@ def run_pipeline(
graph : `lsst.pipe.base.QuantumGraph`
The pipeline and datasets to execute.
fail_fast : `bool`, optional
If `True`, abort all (parallel) execution if any task fails (only
used with the default graph executor).
If `True`, abort all execution if any task fails when
running with multiple processes. Only used with the default graph
executor).
graph_executor : `lsst.ctrl.mpexec.QuantumGraphExecutor`, optional
A custom graph executor. By default, a new instance of
`lsst.ctrl.mpexec.MPGraphExecutor` is used.
num_proc : `int`, optional
The number of processes that can be used to run the pipeline. The
default value ensures that no subprocess is created. Only used with
the default graph executor.
"""
if not graph_executor:
quantum_executor = SingleQuantumExecutor(
Expand All @@ -267,7 +271,7 @@ def run_pipeline(
resources=self.resources,
)
graph_executor = MPGraphExecutor(
numProc=math.ceil(0.8 * multiprocessing.cpu_count()),
numProc=num_proc,
timeout=2_592_000.0, # In practice, timeout is never helpful; set to 30 days.
quantumExecutor=quantum_executor,
failFast=fail_fast,
Expand Down

0 comments on commit 1ce94f1

Please sign in to comment.