Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickets/DM-29345 Add method to get QuantumNodes by TaskDef #175

Merged
merged 1 commit into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions python/lsst/pipe/base/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _buildGraphs(self,

nodeNumberGenerator = count()
self._nodeIdMap: Dict[NodeId, QuantumNode] = {}
self._taskToQuantumNode: DefaultDict[TaskDef, Set[QuantumNode]] = defaultdict(set)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is redundant with self._quanta, given that you can get the quantum of a QuantumNode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I thought about that for my first pass at implementing this. However, that would involve either having a mapping of quantum to NodeId (which is easy enough) or QuantumNode. This would involve a pass of looking up all the quanta, loop over them, and then mapping those to QuantumNodes, and then returning this (with a second hop for NodeId for course) I figured that in large graphs it would be worth the memory trade off vs executing a loop and extra mapping lookups in terms of run time.

There are a few different ways to store all this data internally, but I tried to opt for fast lookups vs efficient packing, as the structures themselves are not duplicated, just extra pointers to the structs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, this could be done the other way, of looking up all the QuantumNodes, extracting the Quantum from then and returning that, but I already had to have the data structure contained in memory of TaskDef to Set[Quantum] to construct the graph, so it seems wasteful to not just use that (again unless we want to free up a dict of pointers)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, clearly there's several ways of doing it. I wasn't actually thinking about memory usage, but about complexity -- redundant data structures are harder to keep in a self-consistent state, especially in the presence of exceptions.

self._count = 0
for taskDef, quantumSet in self._quanta.items():
connections = taskDef.connections
Expand Down Expand Up @@ -134,6 +135,7 @@ def _buildGraphs(self,
inits = quantum.initInputs.values()
inputs = quantum.inputs.values()
value = QuantumNode(quantum, taskDef, nodeId)
self._taskToQuantumNode[taskDef].add(value)
self._nodeIdMap[nodeId] = value

for dsRef in chain(inits, inputs):
Expand Down Expand Up @@ -268,6 +270,22 @@ def getQuantaForTask(self, taskDef: TaskDef) -> FrozenSet[Quantum]:
"""
return frozenset(self._quanta[taskDef])

def getNodesForTask(self, taskDef: TaskDef) -> FrozenSet[QuantumNode]:
"""Return all the `QuantumNodes` associated with a `TaskDef`.

Parameters
----------
taskDef : `TaskDef`
The `TaskDef` for which `Quantum` are to be queried

Returns
-------
frozenset of `QuantumNodes`
The `frozenset` of `QuantumNodes` that is associated with the
specified `TaskDef`.
"""
return frozenset(self._taskToQuantumNode[taskDef])

def findTasksWithInput(self, datasetTypeName: DatasetTypeName) -> Iterable[TaskDef]:
"""Find all tasks that have the specified dataset type name as an
input.
Expand Down
9 changes: 8 additions & 1 deletion tests/test_quantumGraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import pickle
import tempfile
from typing import Iterable
import unittest
import random
from lsst.daf.butler import DimensionUniverse
Expand All @@ -32,7 +33,7 @@
import lsst.pipe.base.connectionTypes as cT
from lsst.daf.butler import Quantum, DatasetRef, DataCoordinate, DatasetType, Config
from lsst.pex.config import Field
from lsst.pipe.base.graph.quantumNode import NodeId, BuildId
from lsst.pipe.base.graph.quantumNode import NodeId, BuildId, QuantumNode
import lsst.utils.tests

try:
Expand Down Expand Up @@ -251,6 +252,12 @@ def testGetQuantaForTask(self):
for task in self.tasks:
self.assertEqual(self.qGraph.getQuantaForTask(task), self.quantumMap[task])

def testGetNodesForTask(self):
for task in self.tasks:
nodes: Iterable[QuantumNode] = self.qGraph.getNodesForTask(task)
quanta_in_node = set(n.quantum for n in nodes)
self.assertEqual(quanta_in_node, self.quantumMap[task])

def testFindTasksWithInput(self):
self.assertEqual(tuple(self.qGraph.findTasksWithInput(DatasetTypeName("Dummy1Output")))[0],
self.tasks[1])
Expand Down