Skip to content

Commit

Permalink
Merge branch 'tickets/DM-31043'
Browse files Browse the repository at this point in the history
  • Loading branch information
mxk62 committed Jul 14, 2021
2 parents 217a9cb + 0ade2e1 commit c386b2b
Show file tree
Hide file tree
Showing 27 changed files with 525 additions and 419 deletions.
8 changes: 4 additions & 4 deletions python/lsst/ctrl/bps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

__path__ = pkgutil.extend_path(__path__, __name__)

from .bps_config import BpsConfig, BpsFormatter
from .generic_workflow import GenericWorkflow, GenericWorkflowJob, GenericWorkflowFile
from .wms_service import BaseWmsService, BaseWmsWorkflow, WmsStates, WmsJobReport, WmsRunReport
from .clustered_quantum_graph import ClusteredQuantumGraph
from .bps_config import *
from .clustered_quantum_graph import *
from .generic_workflow import *
from .version import *
from .wms_service import *
48 changes: 26 additions & 22 deletions python/lsst/ctrl/bps/bps_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Configuration class that adds order to searching sections for value,
expands environment variables and other config variables.
"""
Configuration class that adds order to searching sections for value,
expands environment variables and other config variables
"""

__all__ = ["BPS_SEARCH_ORDER", "BpsConfig", "BpsFormatter"]


from os.path import expandvars
import logging
Expand All @@ -32,16 +34,18 @@

from lsst.daf.butler.core.config import Config


_LOG = logging.getLogger(__name__)

BPS_SEARCH_ORDER = ["payload", "pipetask", "site", "bps_defined"]


class BpsFormatter(string.Formatter):
"""String formatter class that allows BPS config
search options
"""String formatter class that allows BPS config search options.
"""
def get_field(self, field_name, args, kwargs):
_, val = args[0].search(field_name, opt=args[1])
return (val, field_name)
return val, field_name

def get_value(self, key, args, kwargs):
_, val = args[0].search(key, opt=args[1])
Expand All @@ -56,7 +60,7 @@ class BpsConfig(Config):
other : `str`, `dict`, `Config`, `BpsConfig`
Path to a yaml file or a dict/Config/BpsConfig containing configuration
to copy.
search_order : `list` of `str`, optional
search_order : `list` [`str`], optional
Root section names in the order in which they should be searched.
"""
def __init__(self, other, search_order=None):
Expand Down Expand Up @@ -95,17 +99,17 @@ def __init__(self, other, search_order=None):
self[key] = {}

def copy(self):
"""Makes a copy of config
"""Make a copy of config.
Returns
-------
copy : `~lsst.ctrl.bps.bps_config.BpsConfig`
A duplicate of itself
copy : `lsst.ctrl.bps.BpsConfig`
A duplicate of itself.
"""
return BpsConfig(self)

def __getitem__(self, name):
"""Returns the value from the config for the given name
"""Return the value from the config for the given name.
Parameters
----------
Expand All @@ -114,15 +118,15 @@ def __getitem__(self, name):
Returns
-------
val : `str`, `int`, `~lsst.ctrl.bps.bps_config.BPSConfig`, ...
Value from config if found
val : `str`, `int`, `lsst.ctrl.bps.BPSConfig`, ...
Value from config if found.
"""
_, val = self.search(name, {})

return val

def __contains__(self, name):
"""Checks whether name is in config.
"""Check whether name is in config.
Parameters
----------
Expand All @@ -132,13 +136,13 @@ def __contains__(self, name):
Returns
-------
found : `bool`
Whether name was in config or not
Whether name was in config or not.
"""
found, _ = self.search(name, {})
return found

def search(self, key, opt=None):
"""Searches for key using given opt following hierarchy rules.
"""Search for key using given opt following hierarchy rules.
Search hierarchy rules: current values, a given search object, and
search order of config sections.
Expand All @@ -147,7 +151,7 @@ def search(self, key, opt=None):
----------
key : `str`
Key to look for in config.
opt : `dict`, optional
opt : `dict` [`str`, `Any`], optional
Options dictionary to use while searching. All are optional.
``"curvals"``
Expand All @@ -174,9 +178,9 @@ def search(self, key, opt=None):
Returns
-------
found : `bool`
Whether name was in config or not
value : `str`, `int`, `BpsConfig`, ...
Value from config if found
Whether name was in config or not.
value : `str`, `int`, `lsst.ctrl.bps.BpsConfig`, ...
Value from config if found.
"""
_LOG.debug("search: initial key = '%s', opt = '%s'", key, opt)

Expand Down Expand Up @@ -216,7 +220,6 @@ def search(self, key, opt=None):
if "curr_" + sect in curvals:
currkey = curvals["curr_" + sect]
_LOG.debug("currkey for section %s = %s", sect, currkey)
# search_sect = Config.__getitem__(search_sect, currkey)
if Config.__contains__(search_sect, currkey):
search_sect = Config.__getitem__(search_sect, currkey)

Expand Down Expand Up @@ -264,7 +267,8 @@ def search(self, key, opt=None):
# default only applies to original search key
default = opt.pop("default", None)

# Temporarily replace any env vars so formatter doesn't try to replace them.
# Temporarily replace any env vars so formatter doesn't try to
# replace them.
value = re.sub(r"\${([^}]+)}", r"<BPSTMP:\1>", value)

value = self.formatter.format(value, self, opt)
Expand Down
9 changes: 5 additions & 4 deletions python/lsst/ctrl/bps/bps_draw.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Functions used to draw BPS graphs
"""Functions used to draw BPS graphs.
"""
import logging
import networkx


_LOG = logging.getLogger(__name__)


def draw_networkx_dot(graph, outname):
"""Saves drawing of expanded graph to file
"""Save drawing of expanded graph to file.
Parameters
----------
graph :
NetworkX digraph
graph : `networkx.DiGraph`
The graph which graphical representation should be persisted.
outname : `str`
Output filename for drawn graph
"""
Expand Down
14 changes: 8 additions & 6 deletions python/lsst/ctrl/bps/bps_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import logging
from enum import Enum


_LOG = logging.getLogger(__name__)


class WhenToSaveQuantumGraphs(Enum):
"""Values for when to save the job quantum graphs."""
"""Values for when to save the job quantum graphs.
"""
QGRAPH = 1 # Must be using single_quantum_clustering algorithm.
TRANSFORM = 2
PREPARE = 3
Expand Down Expand Up @@ -62,11 +64,11 @@ def create_job_quantum_graph_filename(job, out_prefix=None):
Parameters
----------
job : `~lsst.ctrl.bps.generic_workflow.GenericWorkflowJob`
job : `lsst.ctrl.bps.GenericWorkflowJob`
Job for which the QuantumGraph file is being saved.
out_prefix : `str`, optional
Path prefix for the QuantumGraph filename. If no
out_prefix is given, uses current working directory.
Path prefix for the QuantumGraph filename. If no out_prefix is given,
uses current working directory.
Returns
-------
Expand All @@ -89,11 +91,11 @@ def save_qg_subgraph(qgraph, out_filename, node_ids=None):
Parameters
----------
qgraph : `~lsst.pipe.base.QuantumGraph`
qgraph : `lsst.pipe.base.QuantumGraph`
QuantumGraph to save.
out_filename : `str`
Name of the output file.
node_ids : `list` of `~lsst.pipe.base.NodeId`
node_ids : `list` [`lsst.pipe.base.NodeId`]
NodeIds for the subgraph to save to file.
"""
if not os.path.exists(out_filename):
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/ctrl/bps/cancel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from lsst.utils import doImport


_LOG = logging.getLogger(__name__)


Expand All @@ -34,7 +35,7 @@ def cancel(wms_service, wms_id=None, user=None, require_bps=True, pass_thru=None
Parameters
----------
wms_service : `str` or `~lsst.ctrl.bps.wms_service.WmsService`
wms_service : `str` or `lsst.ctrl.bps.BaseWmsService`
Name of the Workload Management System service class.
wms_id : `str`, optional
ID or path of job that should be canceled.
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import click

from lsst.daf.butler.cli.utils import MWCommand
from lsst.ctrl.bps.drivers import (
from ...drivers import (
acquire_qgraph_driver,
cluster_qgraph_driver,
transform_driver,
Expand Down
13 changes: 8 additions & 5 deletions python/lsst/ctrl/bps/clustered_quantum_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Class definitions for a Clustered QuantumGraph where a node
in the graph is a QuantumGraph.
"""Class definitions for a Clustered QuantumGraph where a node in the graph is
a QuantumGraph.
"""

__all__ = ["ClusteredQuantumGraph"]


import networkx


Expand All @@ -42,7 +45,7 @@ def add_cluster(self, name, node_ids, label=None, tags=None):
----------
name : `str`
Node name which must be unique in the graph.
node_ids : `list` of `~lsst.pipe.base.NodeId`
node_ids : `list` [`lsst.pipe.base.NodeId`]
NodeIds for QuantumGraph subset.
label : `str`, optional
Label for the cluster. Can be used in grouping clusters.
Expand All @@ -53,11 +56,11 @@ def add_cluster(self, name, node_ids, label=None, tags=None):

def add_node(self, node_for_adding, **attr):
"""Override add_node function to ensure that nodes are limited
to QuantumGraphs
to QuantumGraphs.
Parameters
----------
node_for_adding : `str` or `list` of `~lsst.pipe.base.NodeId`
node_for_adding : `str` or `list` [`lsst.pipe.base.NodeId`]
Name of cluster or cluster data (list of NodeIds).
attr : keyword arguments, optional
Attributes to be saved with node in graph.
Expand Down
23 changes: 14 additions & 9 deletions python/lsst/ctrl/bps/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Driver functions for each subcommand.
Driver functions ensure that ensure all setup work is done before running
the subcommand method.
"""


__all__ = [
"acquire_qgraph_driver",
"cluster_qgraph_driver",
"transform_driver",
"prepare_driver",
"submit_driver",
"report_driver",
"cancel_driver",
]


import getpass
import logging
import os
Expand All @@ -40,12 +45,12 @@

from lsst.obs.base import Instrument

from . import BpsConfig
from . import BPS_SEARCH_ORDER, BpsConfig
from .bps_draw import draw_networkx_dot
from .pre_transform import acquire_quantum_graph, cluster_quanta
from .transform import transform
from .prepare import prepare
from .submit import BPS_SEARCH_ORDER, submit
from .submit import submit
from .cancel import cancel
from .report import report

Expand All @@ -63,7 +68,7 @@ def _init_submission_driver(config_file):
Returns
-------
config : `~lsst.ctrl.bps.BpsConfig`
config : `lsst.ctrl.bps.BpsConfig`
Batch Processing Service configuration.
"""
config = BpsConfig(config_file, BPS_SEARCH_ORDER)
Expand All @@ -90,9 +95,9 @@ def acquire_qgraph_driver(config_file):
Returns
-------
config : `~lsst.ctrl.bps.BpsConfig`
config : `lsst.ctrl.bps.BpsConfig`
Updated configuration.
qgraph : `~lsst.pipe.base.graph.QuantumGraph`
qgraph : `lsst.pipe.base.graph.QuantumGraph`
A graph representing quanta.
"""
stime = time.time()
Expand All @@ -116,9 +121,9 @@ def cluster_qgraph_driver(config_file):
Returns
-------
config : `~lsst.ctrl.bps.BpsConfig`
config : `lsst.ctrl.bps.BpsConfig`
Updated configuration.
clustered_qgraph : `~lsst.ctrl.bps.ClusteredQuantumGraph`
clustered_qgraph : `lsst.ctrl.bps.ClusteredQuantumGraph`
A graph representing clustered quanta.
"""
stime = time.time()
Expand Down Expand Up @@ -150,7 +155,7 @@ def transform_driver(config_file):
-------
generic_workflow_config : `lsst.ctrl.bps.BpsConfig`
Configuration to use when creating the workflow.
generic_workflow : `lsst.ctrl.bps.wms_workflow.BaseWmsWorkflow`
generic_workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
Representation of the abstract/scientific workflow specific to a given
workflow management system.
"""
Expand Down Expand Up @@ -185,7 +190,7 @@ def prepare_driver(config_file):
-------
wms_config : `lsst.ctrl.bps.BpsConfig`
Configuration to use when creating the workflow.
workflow : `lsst.ctrl.bps.wms_workflow.BaseWmsWorkflow`
workflow : `lsst.ctrl.bps.BaseWmsWorkflow`
Representation of the abstract/scientific workflow specific to a given
workflow management system.
"""
Expand Down

0 comments on commit c386b2b

Please sign in to comment.