Skip to content

Commit

Permalink
Merge pull request #73 from lsst/tickets/DM-16867
Browse files Browse the repository at this point in the history
DM-16867: Move pieces of pipe_supertask to pipe_base
  • Loading branch information
andy-slac committed Jan 4, 2019
2 parents 91a1c50 + 3393fcb commit 76b0903
Show file tree
Hide file tree
Showing 13 changed files with 2,274 additions and 0 deletions.
5 changes: 5 additions & 0 deletions python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
from .timer import *
from .config import *
from .pipelineTask import *
from .pipeline import *
from .pipelineBuilder import *
from .graph import *
from .graphBuilder import *
from .taskFactory import *
125 changes: 125 additions & 0 deletions python/lsst/pipe/base/configOverrides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Module which defines ConfigOverrides class and related methods.
"""

__all__ = ["ConfigOverrides"]

import lsst.pex.config as pexConfig
import lsst.pex.exceptions as pexExceptions


class ConfigOverrides:
"""Defines a set of overrides to be applied to a task config.
Overrides for task configuration need to be applied by activator when
creating task instances. This class represents an ordered set of such
overrides which activator receives from some source (e.g. command line
or some other configuration).
Methods
----------
addFileOverride(filename)
Add overrides from a specified file.
addValueOverride(field, value)
Add override for a specific field.
applyTo(config)
Apply all overrides to a `config` instance.
Notes
-----
Serialization support for this class may be needed, will add later if
necessary.
"""

def __init__(self):
self._overrides = []

def addFileOverride(self, filename):
"""Add overrides from a specified file.
Parameters
----------
filename : str
Path to the override file.
"""
self._overrides += [('file', filename)]

def addValueOverride(self, field, value):
"""Add override for a specific field.
This method is not very type-safe as it is designed to support
use cases where input is given as string, e.g. command line
activators. If `value` has a string type and setting of the field
fails with `TypeError` the we'll attempt `eval()` the value and
set the field with that value instead.
Parameters
----------
field : str
Fully-qualified field name.
value :
Value to be given to a filed.
"""
self._overrides += [('value', (field, value))]

def applyTo(self, config):
"""Apply all overrides to a task configuration object.
Parameters
----------
config : `pex.Config`
Raises
------
`Exception` is raised if operations on configuration object fail.
"""
for otype, override in self._overrides:
if otype == 'file':
config.load(override)
elif otype == 'value':
field, value = override
field = field.split('.')
# find object with attribute to set, throws if we name is wrong
obj = config
for attr in field[:-1]:
obj = getattr(obj, attr)
# If the type of the object to set is a list field, the value to assign
# is most likely a list, and we will eval it to get a python list object
# which will be used to set the objects value
# This must be done before the try, as it will otherwise set a string which
# is a valid iterable object when a list is the intended object
if isinstance(getattr(obj, field[-1]), pexConfig.listField.List) and isinstance(value, str):
try:
value = eval(value, {})
except Exception:
# Something weird happened here, try passing, and seeing if further
# code can handle this
raise pexExceptions.RuntimeError(f"Unable to parse {value} into a valid list")
try:
setattr(obj, field[-1], value)
except TypeError:
if not isinstance(value, str):
raise
# this can throw
value = eval(value, {})
setattr(obj, field[-1], value)
123 changes: 123 additions & 0 deletions python/lsst/pipe/base/graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Module defining quantum graph classes and related methods.
There could be different representations of the quantum graph depending
on the client needs. Presently this module contains graph implementation
which is based on requirements of command-line environment. In the future
we could add other implementations and methods to convert between those
representations.
"""

# "exported" names
__all__ = ["QuantumGraphNodes", "QuantumGraph"]

# -------------------------------
# Imports of standard modules --
# -------------------------------

# -----------------------------
# Imports for other modules --
# -----------------------------

# ----------------------------------
# Local non-exported definitions --
# ----------------------------------

# ------------------------
# Exported definitions --
# ------------------------


class QuantumGraphNodes:
"""QuantumGraphNodes represents a bunch of nodes in an quantum graph.
The node in quantum graph is represented by the `PipelineTask` and a
single `Quantum` instance. One possible representation of the graph is
just a list of nodes without edges (edges can be deduced from nodes'
quantum inputs and outputs if needed). That representation can be reduced
to the list of PipelineTasks and the corresponding list of Quanta.
This class defines this reduced representation.
Different frameworks may use different graph representation, this
representation was based mostly on requirements of command-line
executor which does not need explicit edges information.
Attributes
----------
taskDef : :py:class:`TaskDef`
Task defintion for this set of nodes.
quanta : `list` of :py:class:`lsst.daf.butler.Quantum`
List of quanta corresponding to the task.
"""
def __init__(self, taskDef, quanta):
self.taskDef = taskDef
self.quanta = quanta


class QuantumGraph(list):
"""QuantumGraph is a sequence of QuantumGraphNodes objects.
Typically the order of the tasks in the list will be the same as the
order of tasks in a pipeline (obviously depends on the code which
constructs graph).
Parameters
----------
iterable : iterable of :py:class:`QuantumGraphNodes` instances, optional
Initial sequence of per-task nodes.
"""
def __init__(self, iterable=None):
list.__init__(self, iterable or [])
self.initInputs = []
self.initOutputs = []
self._inputDatasetTypes = set()
self._outputDatasetTypes = set()

def quanta(self):
"""Iterator over quanta in a graph.
Yields
------
taskDef : `TaskDef`
Task definition for a Quantum.
quantum : `Quantum`
Single quantum.
"""
for taskNodes in self:
taskDef = taskNodes.taskDef
for quantum in taskNodes.quanta:
yield taskDef, quantum

def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True):
total = set()
if initInputs:
for dsRef in self.initInputs:
total.add(dsRef.datasetType)
if initOutputs:
for dsRef in self.initOutputs:
total.add(dsRef.datasetType)
if inputs:
total |= self._inputDatasetTypes
if outputs:
total |= self._outputDatasetTypes
return total

0 comments on commit 76b0903

Please sign in to comment.