Skip to content

Commit

Permalink
Add --cores-per-quantum and --memory-per-quantum options to pipetask run
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jun 21, 2023
1 parent 6d1e974 commit 4b2eee5
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 0 deletions.
2 changes: 2 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ def __init__(self) -> None:
ctrlMpExecOpts.graph_fixup_option(),
ctrlMpExecOpts.summary_option(),
ctrlMpExecOpts.enable_implicit_threading_option(),
ctrlMpExecOpts.cores_per_quantum_option(),
ctrlMpExecOpts.memory_per_quantum_option(),
]


Expand Down
23 changes: 23 additions & 0 deletions python/lsst/ctrl/mpexec/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,29 @@
is_flag=True,
)

cores_per_quantum_option = MWOptionDecorator(
"-n",
"--cores-per-quantum",
default=1,
help=unwrap(
"""Number of cores available to each quantum when executing.
If '-j' is used each subprocess will be allowed to use this number of cores."""
),
type=click.IntRange(min=1),
)

memory_per_quantum_option = MWOptionDecorator(
"--memory-per-quantum",
default="",
help=unwrap(
"""Memory allocated for each quantum to use when executing.
If '-j' used each subprocess will be allowed to use this amount of memory.
Units are allowed and the default units for a plain integer are MB.
For example: '3GB', '3000MB' and '3000' would all result in the same
memory limit. Default is for no limit."""
),
type=str,
)

task_option = MWOptionDecorator(
"-t",
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def run( # type: ignore
unmocked_dataset_types,
mock_failure,
enable_implicit_threading,
cores_per_quantum: int,
memory_per_quantum: str,
**kwargs,
):
"""Implement the command line interface `pipetask run` subcommand.
Expand Down Expand Up @@ -159,6 +161,12 @@ def run( # type: ignore
If `True`, do not disable implicit threading by third-party libraries.
Implicit threading is always disabled during actual quantum execution
if ``processes > 1``.
cores_per_quantum : `int`
Number of cores that can be used by each quantum. Defaults to 1.
memory_per_quantum : `str`
Amount of memory that each quantum can be allowed to use. Empty string
implies no limit. The string can be either a single integer (implying
units of MB) or a combination of number and unit.
kwargs : `dict` [`str`, `str`]
Ignored; click commands may accept options for more than one script
function and pass all the option kwargs to each of the script functions
Expand Down Expand Up @@ -191,6 +199,8 @@ def run( # type: ignore
summary=summary,
# Mock options only used by qgraph.
enable_implicit_threading=enable_implicit_threading,
cores_per_quantum=cores_per_quantum,
memory_per_quantum=memory_per_quantum,
)

f = CmdLineFwk()
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/ctrl/mpexec/cli/script/run_qbb.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def run_qbb(
fail_fast: bool,
summary: str | None,
enable_implicit_threading: bool,
cores_per_quantum: int,
memory_per_quantum: str,
) -> None:
"""Implement the command line interface ``pipetask run-qbb`` subcommand.
Expand Down Expand Up @@ -82,6 +84,12 @@ def run_qbb(
If `True`, do not disable implicit threading by third-party libraries.
Implicit threading is always disabled during actual quantum execution
if ``processes > 1``.
cores_per_quantum : `int`
Number of cores that can be used by each quantum. Defaults to 1.
memory_per_quantum : `str`
Amount of memory that each quantum can be allowed to use. Empty string
implies no limit. The string can be either a single integer (implying
units of MB) or a combination of number and unit.
"""
args = SimpleNamespace(
butler_config=butler_config,
Expand All @@ -98,6 +106,8 @@ def run_qbb(
fail_fast=fail_fast,
summary=summary,
enable_implicit_threading=enable_implicit_threading,
cores_per_quantum=cores_per_quantum,
memory_per_quantum=memory_per_quantum,
)

f = CmdLineFwk()
Expand Down
44 changes: 44 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from types import SimpleNamespace
from typing import TYPE_CHECKING

import astropy.units as u
from astropy.table import Table
from lsst.daf.butler import (
Butler,
Expand All @@ -48,6 +49,7 @@
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.pipe.base import (
ExecutionResources,
GraphBuilder,
Instrument,
Pipeline,
Expand Down Expand Up @@ -697,6 +699,44 @@ def builderShim(butler: Butler) -> Butler:

return qgraph

def _make_execution_resources(self, args: SimpleNamespace) -> ExecutionResources:
"""Construct the execution resource class from arguments.
Parameters
----------
args : `types.SimpleNamespace`
Parsed command line.
Returns
-------
resources : `~lsst.pipe.base.ExecutionResources`
The resources available to each quantum.
"""
# The memory per quantum is a string that needs to be parsed.
mem = None

if args.memory_per_quantum:
try:
mem = int(args.memory_per_quantum)
except ValueError:
pass

Check warning on line 722 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L719-L722

Added lines #L719 - L722 were not covered by tests
else:
mem *= u.MB # Default is for megabytes.

Check warning on line 724 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L724

Added line #L724 was not covered by tests

if not mem:
try:
mem = u.Quantity(args.memory_per_quantum)
except Exception as e:
_LOG.warning(

Check warning on line 730 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L727-L730

Added lines #L727 - L730 were not covered by tests
"Unable to parse the quantity from --memory-per-quantum parameter of %r. "
"Ignoring the parameter. Got error: %s",
args.memory_per_quantum,
e,
)

# This could fail if someone specifies a unit of meters or tesla, etc.
return ExecutionResources(num_cores=args.cores_per_quantum, max_mem=mem)

def runPipeline(
self,
graph: QuantumGraph,
Expand Down Expand Up @@ -769,13 +809,15 @@ def runPipeline(

if not args.init_only:
graphFixup = self._importGraphFixup(args)
resources = self._make_execution_resources(args)
quantumExecutor = SingleQuantumExecutor(
butler,
taskFactory,
skipExistingIn=args.skip_existing_in,
clobberOutputs=args.clobber_outputs,
enableLsstDebug=args.enableLsstDebug,
exitOnKnownError=args.fail_fast,
resources=resources,
)

timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout
Expand Down Expand Up @@ -918,12 +960,14 @@ def runGraphQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> None:
)

# make special quantum executor
resources = self._make_execution_resources(args)

Check warning on line 963 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L963

Added line #L963 was not covered by tests
quantumExecutor = SingleQuantumExecutor(
butler=None,
taskFactory=task_factory,
enableLsstDebug=args.enableLsstDebug,
exitOnKnownError=args.fail_fast,
limited_butler_factory=_butler_factory,
resources=resources,
)

timeout = self.MP_TIMEOUT if args.timeout is None else args.timeout
Expand Down

0 comments on commit 4b2eee5

Please sign in to comment.