Skip to content

Commit

Permalink
Updates based upon review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleGower committed Jul 29, 2021
1 parent f84aaed commit 24eecbf
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 29 deletions.
90 changes: 90 additions & 0 deletions doc/changes/DM-28653.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
* Added bps htcondor job setting that should put jobs that
get the signal 7 when exceeding memory on hold. Held
message will say: "Job raised a signal 7. Usually means
job has gone over memory limit." Until bps has the
automatic memory exceeded retries, you can restart these
the same way as with jobs that htcondor held for exceeding
memory limits (condor_qedit and condor_release).

* Too many files were being written to single directories in
job/<label>. There is now a template for it defined in yaml:
subDirTemplate: "{label}/{tract}/{patch}/{visit.day_obs}/{exposure.day_obs}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}"

To revert back to previous behavior, in your submit yaml set:
subDirTemplate: "{label}"

* bps now has defaults so submit yamls should be a lot simpler and
require less changes when bps or pipetask changes. For default
values see ${CTRL_BPS_DIR}/python/lsst/ctrl/bps/etc/bps_defaults.yaml.
See ${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/pipelines_check.yaml for
an example of much simpler submit yaml.

Values in bps_defaults.yaml are overridden by values in submit
yaml (be careful of scoping rules e.g., values in a pipetask
section override the global setting).

STRONGLY recommend removing (commenting out) settings in the
submit yaml that are set in the default yaml (i.e., the settings
that are same across runs across repos, ...)

It would be helpful to know in what cases submit yamls have to
override default settings, in particular the command lines.

* With the above defaults one can more easily append options to the
pipetask command lines as variables in submit yaml:
* extraQgraphOptions: Adds given string to end of command line for
creating QuantumGraph (e.g., for specifying a task wit -t)
* extraInitOptions: Adds given string to end of pipetaskInit
command line
* extraRunQuantumOptions: Adds given string to end of the pipetask
command line for running a Quantum (e.g., "--no-versions")

These can also be specified on the command line (see `bps submit --help`).
* --extra-qgraph-options TEXT
* --extra-init-options TEXT
* --extra-run-quantum-options TEXT

Settings on command line override values set in submit yaml.

The default commands no longer include "--no-versions" or saving
a dot version of the QuantumGraph. Use the appropriate new variable
or command-line option to add those back.

* Can specify some pipetask options on command line (see `bps submit --help`):
* -b, --butler-config TEXT
* -i, --input COLLECTION ...
* -o, --output COLL
* --output-run COLL
* -d, --data-query QUERY
* -p, --pipeline FILE
* -g, --qgraph TEXT

Settings on command line override values set in submit yaml.

* bps now saves yaml in run's submit directory. One is
just a copy of the submit yaml (uses original filename). And
one is a dump of the config after combining command-line options,
defaults and submit yaml (<run>_config.yaml).

* If pipetask starts reporting errors about database connections
(e.g., remaining connection slots are reserved for non-replication
superuser connections) ask on #dm-middleware-support about
using execution butler in bps. This greatly reduces the number of
connections to the central database per run. It is not yet the default
behavior of bps, but one can modify the submit yaml to use it. See
${CTRL_BPS_DIR}/doc/lsst.ctrl.bps/pipelines_check_execution_butler.yaml

The major differences visible to users are:
* bps report shows new job called mergeExecutionButler in detailed view.
This is what saves the run info into the central butler repository.
As with any job, it can succeed or fail. Different from other jobs, it
will execute at the end of a run regardless of whether a job failed or
not. It will even execute if the run is cancelled unless the cancellation
is while the merge is running. Its output will go where other jobs go (at
NCSA in jobs/mergeExecutionButler directory).
* See new files in submit directory:
* EXEC_REPO-<run>: Execution butler (yaml + initial sqlite file)
* execution_butler_creation.out: output of command to create execution butler
* final_job.bash: Script that is executed to do the merging of the run info into the central repo.
* final_post_mergeExecutionButler.out: An internal file for debugging incorrect reporting of final run status.

6 changes: 6 additions & 0 deletions doc/changes/DM-29893.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
* bps report
* Columns now are as wide as the widest value/heading
and some other minor formatting changes.
* Detailed report (--id) now has an Expected column
that shows expected counts per PipelineTask label
from the QuantumGraph.
24 changes: 24 additions & 0 deletions doc/changes/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Recording Changes
=================

This directory contains "news fragments" which are small files containing text that will be integrated into release notes.
The files can be in restructured text format or plain text.

Each file should be named like ``<JIRA TICKET>.<TYPE>`` with a file extension defining the markup format.
The ``<TYPE>`` should be one of:

* ``feature``: New feature
* ``bugfix``: A bug fix.
* ``api``: An API change.
* ``perf``: A performance enhancement.
* ``doc``: A documentation improvement.
* ``removal``: An API removal or deprecation.
* ``other``: Other Changes and Additions of interest to general users.
* ``misc``: Changes that are of minor interest.

An example file name would therefore look like ``DM-30291.misc.rst``.

If the change concerns specifically the registry or a datastore the news fragment can be placed in the relevant subdirectory.

You can test how the content will be integrated into the release notes by running ``towncrier --draft --version=V.vv``.
``towncrier`` can be installed from PyPI or conda-forge.
2 changes: 1 addition & 1 deletion doc/lsst.ctrl.bps/pipelines_check_execution_butler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ executionButler:
createCommand: "${CTRL_MPEXEC_DIR}/bin/pipetask qgraph -b {butlerConfig} --input {inCollection} --output-run {outCollection} --save-execution-butler {executionButlerDir} -g {qgraphFile}"
whenMerge: "ALWAYS"
implementation: JOB # JOB, WORKFLOW
concurrency_limit: db_limit
concurrency_limit: db_limit

# What commands to run as part of the merge step:
command1: "${DAF_BUTLER_DIR}/bin/butler --log-level=VERBOSE transfer-datasets {executionButlerDir} {butlerConfig} --collections {outCollection}"
Expand Down
12 changes: 6 additions & 6 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ via variables.

- ``--extra-qgraph-options``
String to pass through to QuantumGraph builder.
Replaces variable ``extraQgraphOptions`` in ``createQuantumGraph``.
Replaces variable ``extraQgraphOptions`` in ``createQuantumGraph``.
- ``--extra-init-options``
String to pass through to pipetaskInit execution.
Replaces variable ``extraInitOptions`` in ``pipetaskInit``'s
Expand Down Expand Up @@ -307,7 +307,7 @@ syntax to be mindful about is that boolean values must be all
lowercase.

${CTRL_BPS_DIR}/python/lsst/ctrl/bps/etc/bps_defaults.yaml
contains default values used by every bps submission and is
contains default values used by every bps submission and is
automatically included.

Configuration file can include other configuration files using
Expand Down Expand Up @@ -639,7 +639,7 @@ For ``--replace-run`` behavior, replace the one collection-chain command with th
**whenCreate**
When during the submission process that the Execution Butler is created.
whenCreate valid values: "NEVER", "ACQUIRE", "TRANSFORM", "PREPARE",
whenCreate valid values: "NEVER", "ACQUIRE", "TRANSFORM", "PREPARE",
"SUBMIT". The recommended setting is "SUBMIT" because the run collection
is stored in the Execution Butler and that should be set as late as
possible in the submission process.
Expand Down Expand Up @@ -679,8 +679,8 @@ For ``--replace-run`` behavior, replace the one collection-chain command with th
a little workflow representing the sequence of commands.

**concurrency_limit**
Name of the concurrency limit. For butler repositories that need to
limit the number of simultaneous merges, this name tells the plugin to
Name of the concurrency limit. For butler repositories that need to
limit the number of simultaneous merges, this name tells the plugin to
limit the number of mergeExecutionButler jobs via some mechanism, e.g.,
a special queue.

Expand All @@ -704,7 +704,7 @@ The major differences visible to users are:
This is what saves the run info into the central butler repository.
As with any job, it can succeed or fail. Different from other jobs, it
will execute at the end of a run regardless of whether a job failed or
not. It will even execute if the run is cancelled unless the
not. It will even execute if the run is cancelled unless the
cancellation is while the merge is running. Its output will go where
other jobs go (at NCSA in jobs/mergeExecutionButler directory).
- Extra files in submit directory:
Expand Down
12 changes: 7 additions & 5 deletions python/lsst/ctrl/bps/bps_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
__all__ = ["BPS_SEARCH_ORDER", "BpsConfig", "BpsFormatter"]


from os.path import expandvars, realpath, dirname
from os.path import expandvars
import logging
import copy
import string
import re
from importlib.resources import path as resources_path

from lsst.daf.butler.core.config import Config

from . import etc

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,10 +87,10 @@ def __init__(self, other, search_order=None):
super().__init__()

if isinstance(other, str):
# First load default config from ctrl_bps, then
# override with user config.
bps_defaults = realpath(dirname(__file__) + "/../../../../etc/bps_defaults.yaml")
tmp_config = Config(bps_defaults)
# First load default config from ctrl_bps, then override with
# user config.
with resources_path(etc, "bps_defaults.yaml") as bps_defaults:
tmp_config = Config(str(bps_defaults))
user_config = Config(other)
tmp_config.update(user_config)
other = tmp_config
Expand Down
1 change: 0 additions & 1 deletion python/lsst/ctrl/bps/bps_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def create_job_quantum_graph_filename(config, job, out_prefix=None):
Parameters
----------
job : `lsst.ctrl.bps.GenericWorkflowJob`
config : `lsst.ctrl.bps.BpsConfig`
BPS configuration (at minimum must contain qgraphFile and
outCollection).
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/ctrl/bps/cli/opt/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lsst.daf.butler.cli.utils import MWArgumentDecorator

__all__ = ["config_file_argument"]


config_file_argument = MWArgumentDecorator(
"config_file",
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/cli/opt/option_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""Option groups for bps
"""

__all__ = ("SubmissionOptions",)
__all__ = ["SubmissionOptions"]

from lsst.daf.butler.cli.utils import OptionGroup, option_section
from lsst.ctrl.mpexec.cli.opt import (
Expand All @@ -40,7 +40,7 @@
)


class SubmissionOptions(OptionGroup): # noqa: N801
class SubmissionOptions(OptionGroup):
"""Decorator to add options to a command function for any
stage during submission.
"""
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/cli/opt/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"""bps-specific command-line options.
"""

__all__ = ("extra_qgraph_option", "extra_init_option", "extra_run_quantum_option")
__all__ = ["extra_qgraph_option", "extra_init_option", "extra_run_quantum_option"]

from lsst.daf.butler.cli.utils import MWOptionDecorator

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _init_submission_driver(config_file, **kwargs):
"qgraph": "qgraphFile",
"pipeline": "pipelineYaml"}
for key, value in kwargs.items():
# Don't want to override config with None values
# Don't want to override config with None or empty string values.
if value:
# pipetask argument parser converts some values to list,
# but bps will want string.
Expand Down
Empty file.
File renamed without changes.
8 changes: 4 additions & 4 deletions python/lsst/ctrl/bps/generic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def add_final(self, final):
Parameters
----------
final : `lsst.ctrl.bps.GenericWorkflowJob` or
final : `lsst.ctrl.bps.GenericWorkflowJob` or \
`lsst.ctrl.bps.GenericWorkflow`
Information needed to execute the special final job(s), the
job(s) to be executed after all jobs that can be executed
Expand All @@ -693,7 +693,7 @@ def get_final(self):
Returns
-------
final : `lsst.ctrl.bps.GenericWorkflowJob` or
final : `lsst.ctrl.bps.GenericWorkflowJob` or \
`lsst.ctrl.bps.GenericWorkflow`
Information needed to execute final job(s).
"""
Expand All @@ -710,7 +710,7 @@ def add_executable(self, executable):
if executable is not None:
self._executables[executable.name] = executable
else:
_LOG.warning("add_executable the executable is None")
_LOG.warning("executable not specified (None); cannot add to the workflow's list of executables")

def get_executables(self, data=False, transfer_only=True):
"""Retrieve executables from generic workflow.
Expand All @@ -730,7 +730,7 @@ def get_executables(self, data=False, transfer_only=True):
Filtered executable names or objects from generic workflow.
"""
execs = []
for name, executable in self._execs.items():
for name, executable in self._executables.items():
if not transfer_only or executable.transfer_executable:
if not data:
execs.append(name)
Expand Down
4 changes: 2 additions & 2 deletions python/lsst/ctrl/bps/pre_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def acquire_quantum_graph(config, out_prefix=""):
----------
config : `lsst.ctrl.bps.BpsConfig`
Configuration values for BPS. In particular, looking for qgraphFile.
out_prefix : `str`
out_prefix : `str`, optional
Output path for the QuantumGraph and stdout/stderr from generating
the QuantumGraph. Default value is empty string.
Expand Down Expand Up @@ -160,7 +160,7 @@ def create_quantum_graph(config, out_prefix=""):
----------
config : `lsst.ctrl.bps.BpsConfig`
BPS configuration.
out_prefix : `str`
out_prefix : `str`, optional
Path in which to output QuantumGraph as well as the stdout/stderr
from generating the QuantumGraph. Defaults to empty string so
code will write the QuantumGraph and stdout/stderr to the current
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ def _fill_arguments(config, generic_workflow, arguments, cmdvals):
_, use_shared = config.search("bpsUseShared", opt={"default": False})
for file_key in re.findall(r"<FILE:([^>]+)>", arguments):
gwfile = generic_workflow.get_file(file_key)
if use_shared and gwfile.job_shared:
uri = gwfile.src_uri
else:
if gwfile.wms_transfer and not use_shared or not gwfile.job_shared:
uri = os.path.basename(gwfile.src_uri)
else:
uri = gwfile.src_uri
arguments = arguments.replace(f"<FILE:{file_key}>", uri)

# Replace env placeholder with submit-side values
Expand Down
3 changes: 0 additions & 3 deletions python/lsst/ctrl/bps/wms/htcondor/lssthtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,16 +450,13 @@ class HTCJob:
Initial commands for job inside DAG.
initattrs : `dict`
Initial dictionary of job attributes.
subfile : `str`
Filename for the job's submit script.
"""
def __init__(self, name, label=None, initcmds=(), initdagcmds=(), initattrs=None):
self.name = name
self.label = label
self.cmds = RestrictedDict(HTC_VALID_JOB_KEYS, initcmds)
self.dagcmds = RestrictedDict(HTC_VALID_JOB_DAG_KEYS, initdagcmds)
self.attrs = initattrs
self.filename = None
self.subfile = None

def __str__(self):
Expand Down

0 comments on commit 24eecbf

Please sign in to comment.