Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-34876: Add cloud concept to bps. #105

Merged
merged 1 commit into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/changes/DM-34876.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Add concept of cloud, in particular to be used by PanDA plugin.

* Submit yaml can specify cloud with computeCloud.
* Common cloud values can be specified in cloud subsection.
cloud:
cloud_name_1:
key1: value
key2: value
* GenericWorkflowJob has compute_cloud.
23 changes: 17 additions & 6 deletions doc/lsst.ctrl.bps/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Restart a failed run with
where ``<id>`` is the id of the run that need to be restarted. What the id is
depends on the workflow management system the BPS is configured to use. For
example, if the BPS was configured to use the HTCondor, the only valid id is
the submit directory.
the submit directory.

If the restart completed successfully, the command will output something
similar to:
Expand All @@ -255,7 +255,7 @@ command-line option and in the config file). The value of a setting is
determined by following order:

#. command-line option,
#. config file (if used by a subcommand),
#. config file (if used by a subcommand),
#. environment variable,
#. package default.

Expand Down Expand Up @@ -303,7 +303,8 @@ Environment variables can be used as well with ``${var}`` syntax, for example

Section names can be used to store default settings at that concept level which
can be overridden by settings at more specific concept levels.  Currently the
order from most specific to general is: ``payload``, ``pipetask``, and ``site``.
order from most specific to general is: ``payload``, ``pipetask``, ``site``,
and ``cloud``.

**payload**
description of the submission including definition of inputs. These values
Expand Down Expand Up @@ -347,6 +348,12 @@ order from most specific to general is: ``payload``, ``pipetask``, and ``site``.
which are matched to ``computeSite``. See the documentation of the WMS
plugin in use for examples of site specifications.

**cloud**
settings for a particular cloud (group of sites) can be set here.
Subsections cloud names which are matched to ``computeCloud``. See
the documentation of the WMS plugin in use for examples of cloud
specifications.

Supported settings
^^^^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -383,6 +390,10 @@ Supported settings
Specification of the compute site where to run the workflow and which site
settings to use in ``bps prepare``).

**computeCloud**
Specification of the compute cloud where to run the workflow and which
cloud settings to use in ``bps prepare``).

**createQuantumGraph**
The command line specification for generating QuantumGraphs.

Expand Down Expand Up @@ -430,7 +441,7 @@ Supported settings
The process will continue until number of retries reaches its limit
determined by ``numberOfRetries`` (5 by default) *or* the resultant memory
request reaches the memory cap determined by ``requestMemoryMax``.

Once the memory request reaches the cap the job will be run one time
allowing to use the amount of memory determined by the cap (providing a
retry is still permitted) and removed from the job queue afterwards if it
Expand All @@ -447,7 +458,7 @@ Supported settings
GB), and ``memoryMultiplier = 2.0`` the job will be allowed to use 64 GB
of memory during its first retry. If it fails due to insufficient memory,
it will be removed from the job queue.

In both examples if the job keeps failing for other reasons, the final
number of retries will be determined by ``numberOfRetries``.

Expand Down Expand Up @@ -851,7 +862,7 @@ e.g. ``submit/shared/pipecheck/20220407T184331Z/quantumGraphGeneration.out``.

.. _bps-appendix-a:

Appendix A
Appendix A
----------

Prerequisites
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/bps_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
with resources.path(etc, "bps_defaults.yaml") as path:
BPS_DEFAULTS = Config(str(path)).toDict()

BPS_SEARCH_ORDER = ["bps_cmdline", "payload", "cluster", "pipetask", "site", "bps_defined"]
BPS_SEARCH_ORDER = ["bps_cmdline", "payload", "cluster", "pipetask", "site", "cloud", "bps_defined"]

# Need a string that won't be a valid default value
# to indicate whether default was defined for search.
Expand Down
6 changes: 6 additions & 0 deletions python/lsst/ctrl/bps/generic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ class GenericWorkflowJob:
"""Environment variable names and values to be explicitly set inside job.
"""

compute_cloud: Optional[str]
"""Key to look up cloud-specific information for running the job.
"""

# As of python 3.7.8, can't use __slots__ if give default values, so
# writing own __init__.
def __init__(self, name: str):
Expand Down Expand Up @@ -300,6 +304,7 @@ def __init__(self, name: str):
self.profile = {}
self.attrs = {}
self.environment = {}
self.compute_cloud = None

__slots__ = (
"name",
Expand Down Expand Up @@ -334,6 +339,7 @@ def __init__(self, name: str):
"preemptible",
"profile",
"attrs",
"compute_cloud",
)

def __hash__(self):
Expand Down
24 changes: 22 additions & 2 deletions python/lsst/ctrl/bps/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def create_init_workflow(config, qgraph, qgraph_gwfile):
"replaceEnvVars": True,
"required": False,
}
found, value = config.search("computeSite", opt=search_opt)
if found:
search_opt["curvals"]["curr_site"] = value
found, value = config.search("computeCloud", opt=search_opt)
if found:
search_opt["curvals"]["curr_cloud"] = value

init_workflow = GenericWorkflow("init")
init_workflow.add_file(qgraph_gwfile)
Expand Down Expand Up @@ -440,6 +446,7 @@ def _get_job_values(config, search_opt, cmd_line_key):
job_values : `dict` [ `str`, `Any` ]`
A mapping between job attributes and their values.
"""
_LOG.debug("cmd_line_key=%s, search_opt=%s", cmd_line_key, search_opt)
job_values = {}
for attr in _ATTRS_ALL:
# Variable names in yaml are camel case instead of snake case.
Expand Down Expand Up @@ -667,6 +674,12 @@ def create_generic_workflow(config, cqgraph, name, prefix):

# First get job values from cluster or cluster config
search_opt["curvals"] = {"curr_cluster": cluster.label}
found, value = config.search("computeSite", opt=search_opt)
if found:
search_opt["curvals"]["curr_site"] = value
found, value = config.search("computeCloud", opt=search_opt)
if found:
search_opt["curvals"]["curr_cloud"] = value

# If some config values are set for this cluster
if cluster.label not in cached_job_values:
Expand Down Expand Up @@ -712,7 +725,7 @@ def create_generic_workflow(config, cqgraph, name, prefix):
qnode = cqgraph.get_quantum_node(node_id)

if qnode.taskDef.label not in cached_pipetask_values:
search_opt["curvals"] = {"curr_pipetask": qnode.taskDef.label}
search_opt["curvals"]["curr_pipetask"] = qnode.taskDef.label
cached_pipetask_values[qnode.taskDef.label] = _get_job_values(
config, search_opt, "runQuantumCommand"
)
Expand Down Expand Up @@ -802,7 +815,14 @@ def add_final_job(config, generic_workflow, prefix):
_, when_create = config.search(".executionButler.whenCreate")
_, when_merge = config.search(".executionButler.whenMerge")

search_opt = {"searchobj": config[".executionButler"], "default": None}
search_opt = {"searchobj": config[".executionButler"], "curvals": {}, "default": None}
found, value = config.search("computeSite", opt=search_opt)
if found:
search_opt["curvals"]["curr_site"] = value
found, value = config.search("computeCloud", opt=search_opt)
if found:
search_opt["curvals"]["curr_cloud"] = value

if when_create.upper() != "NEVER" and when_merge.upper() != "NEVER":
# create gwjob
gwjob = GenericWorkflowJob("mergeExecutionButler")
Expand Down
8 changes: 8 additions & 0 deletions tests/cqg_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import uuid

from lsst.ctrl.bps.quantum_clustering_funcs import dimension_clustering
from networkx import is_directed_acyclic_graph
from qg_test_utils import make_test_quantum_graph


def check_cqg(cqg, truth=None):
Expand Down Expand Up @@ -159,3 +161,9 @@ def compare_cqg_dicts(truth, cqg):
assert set(truth["edges"]) == set(
cqg["edges"]
), f"Mismatch edges: truth={truth['edges']}, cqg={cqg['edges']}"


def make_test_clustered_quantum_graph(config):
qgraph = make_test_quantum_graph()
cqg = dimension_clustering(config, qgraph, "test_cqg")
return cqg
135 changes: 135 additions & 0 deletions tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# This file is part of ctrl_bps.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Unit tests of transform.py"""
import os
import shutil
import tempfile
import unittest

from cqg_test_utils import make_test_clustered_quantum_graph
from lsst.ctrl.bps import BPS_SEARCH_ORDER, BpsConfig
from lsst.ctrl.bps.transform import create_generic_workflow, create_generic_workflow_config

TESTDIR = os.path.abspath(os.path.dirname(__file__))


class TestCreateGenericWorkflowConfig(unittest.TestCase):
"""Tests of create_generic_workflow_config."""

def testCreate(self):
"""Test successful creation of the config."""
config = BpsConfig({"a": 1, "b": 2, "uniqProcName": "testCreate"})
wf_config = create_generic_workflow_config(config, "/test/create/prefix")
assert isinstance(wf_config, BpsConfig)
for key in config:
assert wf_config[key] == config[key]
assert wf_config["workflowName"] == "testCreate"
assert wf_config["workflowPath"] == "/test/create/prefix"


class TestCreateGenericWorkflow(unittest.TestCase):
"""Tests of create_generic_workflow."""

def setUp(self):
self.tmpdir = tempfile.mkdtemp(dir=TESTDIR)
self.config = BpsConfig(
{
"runInit": True,
"computeSite": "global",
"runQuantumCommand": "gexe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}",
"clusterTemplate": "{D1}_{D2}",
"cluster": {
"cl1": {"pipetasks": "T1, T2", "dimensions": "D1, D2"},
"cl2": {"pipetasks": "T3, T4", "dimensions": "D1, D2"},
},
"cloud": {
"cloud1": {"runQuantumCommand": "c1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
"cloud2": {"runQuantumCommand": "c2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
},
"site": {
"site1": {"runQuantumCommand": "s1exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
"site2": {"runQuantumCommand": "s2exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
"global": {"runQuantumCommand": "s3exe -q {qgraphFile} --qgraph-node-id {qgraphNodeId}"},
},
# Needed because transform assumes they exist
"whenSaveJobQgraph": "NEVER",
"executionButler": {"whenCreate": "SUBMIT", "whenMerge": "ALWAYS"},
},
BPS_SEARCH_ORDER,
)
self.cqg = make_test_clustered_quantum_graph(self.config)

def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)

def testCreatingGenericWorkflowGlobal(self):
"""Test creating a GenericWorkflow with global settings."""
config = BpsConfig(self.config)
config["computeCloud"] = "cloud1"
config["computeSite"] = "site2"
config["queue"] = "global_queue"
print(config)
workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir)
for jname in workflow:
gwjob = workflow.get_job(jname)
print(gwjob)
assert gwjob.compute_site == "site2"
assert gwjob.compute_cloud == "cloud1"
assert gwjob.executable.src_uri == "s2exe"
assert gwjob.queue == "global_queue"
final = workflow.get_final()
assert final.compute_site == "site2"
assert final.compute_cloud == "cloud1"
assert final.queue == "global_queue"

def testCreatingQuantumGraphMixed(self):
"""Test creating a GenericWorkflow with setting overrides."""
config = BpsConfig(self.config)
config[".cluster.cl1.computeCloud"] = "cloud2"
config[".cluster.cl1.computeSite"] = "notthere"
config[".cluster.cl2.computeSite"] = "site1"
config[".executionButler.queue"] = "special_final_queue"
config[".executionButler.computeSite"] = "special_site"
config[".executionButler.computeCloud"] = "special_cloud"
workflow = create_generic_workflow(config, self.cqg, "test_gw", self.tmpdir)
for jname in workflow:
gwjob = workflow.get_job(jname)
print(gwjob)
if jname.startswith("cl1"):
assert gwjob.compute_site == "notthere"
assert gwjob.compute_cloud == "cloud2"
assert gwjob.executable.src_uri == "c2exe"
elif jname.startswith("cl2"):
assert gwjob.compute_site == "site1"
assert gwjob.compute_cloud is None
assert gwjob.executable.src_uri == "s1exe"
elif jname.startswith("pipetask"):
assert gwjob.compute_site == "global"
assert gwjob.compute_cloud is None
assert gwjob.executable.src_uri == "s3exe"
final = workflow.get_final()
assert final.compute_site == "special_site"
assert final.compute_cloud == "special_cloud"
assert final.queue == "special_final_queue"


if __name__ == "__main__":
unittest.main()