Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-19988: rewrite of QuantumGraph generation #95

Merged
merged 7 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: false
language: python
matrix:
include:
- python: '3.6'
- python: '3.7'
install:
- pip install flake8
script: flake8
126 changes: 67 additions & 59 deletions python/lsst/pipe/base/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

"""Module defining quantum graph classes and related methods.

Expand All @@ -35,13 +36,16 @@
# Imports of standard modules --
# -------------------------------
from itertools import chain
from dataclasses import dataclass
from typing import List, FrozenSet, Mapping

# -----------------------------
# Imports for other modules --
# -----------------------------
from .pipeline import Pipeline
from .pipeline import Pipeline, TaskDef
from .pipeTools import orderPipeline
from lsst.daf.butler import DataId
from lsst.daf.butler import DataId, Quantum, DatasetRef, DatasetType
from lsst.daf.butler.core.utils import NamedKeyDict

# ----------------------------------
# Local non-exported definitions --
Expand All @@ -52,41 +56,36 @@
# ------------------------


@dataclass
class QuantumIterData:
"""Helper class for iterating over quanta in a graph.

`QuantumGraph.traverse` method needs to return topologically ordered
The `QuantumGraph.traverse` method needs to return topologically ordered
Quanta together with their dependencies. This class is used as a value
for iterator, it contains enumerated Quantum and its dependencies.

Parameters
----------
quantumId : `int`
Index of this Quantum, unique but arbitrary integer.
quantum : `~lsst.daf.butler.Quantum`
Quantum corresponding to a graph node.
taskDef : `TaskDef`
Task to be run on this quantum.
dependencies : iterable of `int`
Possibly empty sequence of indices of dependencies for this Quantum.
Prerequisites include other nodes in the graph; they do not reflect
data already in butler (there are no graph nodes for those).
for the iterator, it contains enumerated Quantum and its dependencies.
"""

__slots__ = ["quantumId", "quantum", "taskDef", "dependencies"]
__slots__ = ["index", "quantum", "taskDef", "dependencies"]

def __init__(self, quantumId, quantum, taskDef, dependencies):
self.quantumId = quantumId
self.quantum = quantum
self.taskDef = taskDef
self.dependencies = frozenset(dependencies)
index: int
"""Index of this Quantum, a unique but arbitrary integer."""

quantum: Quantum
"""Quantum corresponding to a graph node."""

taskDef: TaskDef
"""Task class to be run on this quantum, and corresponding label and
config.
"""

def __str__(self):
return "QuantumIterData({}, {}, {})".format(self.quantumId,
self.taskDef,
self.dependencies)
dependencies: FrozenSet(int)
"""Possibly empty set of indices of dependencies for this Quantum.
Dependencies include other nodes in the graph; they do not reflect data
already in butler (there are no graph nodes for those).
"""


@dataclass
class QuantumGraphTaskNodes:
"""QuantumGraphTaskNodes represents a bunch of nodes in an quantum graph
corresponding to a single task.
Expand All @@ -103,17 +102,19 @@ class QuantumGraphTaskNodes:
Different frameworks may use different graph representation, this
representation was based mostly on requirements of command-line
executor which does not need explicit edges information.

Attributes
----------
taskDef : `TaskDef`
Task defintion for this set of nodes.
quanta : `list` of `~lsst.daf.butler.Quantum`
List of quanta corresponding to the task.
"""
def __init__(self, taskDef, quanta):
self.taskDef = taskDef
self.quanta = quanta

taskDef: TaskDef
"""Task defintion for this set of nodes."""

quanta: List[Quantum]
"""List of quanta corresponding to the task."""

initInputs: Mapping[DatasetType, DatasetRef]
"""Datasets that must be loaded or created to construct this task."""

initOutputs: Mapping[DatasetType, DatasetRef]
"""Datasets that may be written after constructing this task."""


class QuantumGraph(list):
Expand All @@ -130,10 +131,30 @@ class QuantumGraph(list):
"""
def __init__(self, iterable=None):
list.__init__(self, iterable or [])
self.initInputs = []
self.initOutputs = []
self._inputDatasetTypes = set()
self._outputDatasetTypes = set()
self.initInputs = NamedKeyDict()
self.initIntermediates = NamedKeyDict()
self.initOutputs = NamedKeyDict()

initInputs: NamedKeyDict
"""Datasets that must be provided to construct one or more Tasks in this
graph, and must be obtained from the data repository.

This is disjoint with both `initIntermediates` and `initOutputs`.
"""

initIntermediates: NamedKeyDict
"""Datasets that must be provided to construct one or more Tasks in this
graph, but are also produced after constructing a Task in this graph.

This is disjoint with both `initInputs` and `initOutputs`.
"""

initOutputs: NamedKeyDict
"""Datasets that are produced after constructing a Task in this graph,
and are not used to construct any other Task in this graph.

This is disjoint from both `initInputs` and `initIntermediates`.
"""

def quanta(self):
"""Iterator over quanta in a graph.
Expand All @@ -157,11 +178,11 @@ def traverse(self):

This method iterates over all Quanta in topological order, enumerating
them during iteration. Returned `QuantumIterData` object contains
Quantum instance, its ``quantumId`` and ``quantumId`` of all its
Quantum instance, its ``index`` and the ``index`` of all its
prerequsites (Quanta that produce inputs for this Quantum):
- the ``quantumId`` values are generated by an iteration of a
- the ``index`` values are generated by an iteration of a
QuantumGraph, and are not intrinsic to the QuantumGraph
- during iteration, each ID will appear in quantumId before it ever
- during iteration, each ID will appear in index before it ever
appears in dependencies.

Yields
Expand Down Expand Up @@ -212,19 +233,6 @@ def orderedTaskNodes(graph):
key = (dataRef.datasetType.name, DataId(dataRef.dataId))
outputs[key] = index

yield QuantumIterData(index, quantum, nodes.taskDef, prereq)
yield QuantumIterData(index=index, quantum=quantum, taskDef=nodes.taskDef,
dependencies=frozenset(prereq))
index += 1

def getDatasetTypes(self, initInputs=True, initOutputs=True, inputs=True, outputs=True):
total = set()
if initInputs:
for dsRef in self.initInputs:
total.add(dsRef.datasetType)
if initOutputs:
for dsRef in self.initOutputs:
total.add(dsRef.datasetType)
if inputs:
total |= self._inputDatasetTypes
if outputs:
total |= self._outputDatasetTypes
return total