Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26371: Make Quantum immutable #382

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
175 changes: 43 additions & 132 deletions python/lsst/daf/butler/core/quantum.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
Type,
TYPE_CHECKING,
Union,
Tuple,
Any
)

import astropy.time

from lsst.utils import doImport

from .named import NamedKeyDict
from .named import NamedKeyDict, NamedKeyMapping

if TYPE_CHECKING:
from .dimensions import DataCoordinate
Expand All @@ -64,74 +64,43 @@ class Quantum:
provided.
dataId : `DataId`, optional
The dimension values that identify this `Quantum`.
run : `str`, optional
The name of the run this Quantum is a part of.
initInputs : collection of `DatasetRef`, optional
Datasets that are needed to construct an instance of the Task. May
be a flat iterable of `DatasetRef` instances or a mapping from
`DatasetType` to `DatasetRef`.
predictedInputs : `~collections.abc.Mapping`, optional
inputs : `~collections.abc.Mapping`, optional
Inputs identified prior to execution, organized as a mapping from
`DatasetType` to a list of `DatasetRef`. Must be a superset of
``actualInputs``.
actualInputs : `~collections.abc.Mapping`, optional
Inputs actually used during execution, organized as a mapping from
`DatasetType` to a list of `DatasetRef`. Must be a subset of
``predictedInputs``.
`DatasetType` to a list of `DatasetRef`.
outputs : `~collections.abc.Mapping`, optional
Outputs from executing this quantum of work, organized as a mapping
from `DatasetType` to a list of `DatasetRef`.
startTime : `astropy.time.Time`
The start time for the quantum.
endTime : `astropy.time.Time`
The end time for the quantum.
host : `str`
The system on this quantum was executed.
id : `int`, optional
Unique integer identifier for this quantum. Usually set to `None`
(default) and assigned by `Registry`.
"""

__slots__ = ("_taskName", "_taskClass", "_dataId", "_run",
"_initInputs", "_predictedInputs", "_actualInputs", "_outputs",
"_id", "_startTime", "_endTime", "_host")
__slots__ = ("_taskName", "_taskClass", "_dataId", "_initInputs", "_inputs", "_outputs", "_hash")

def __init__(self, *, taskName: Optional[str] = None,
taskClass: Optional[Type] = None,
dataId: Optional[DataCoordinate] = None,
run: Optional[str] = None,
initInputs: Optional[Union[Mapping[DatasetType, DatasetRef], Iterable[DatasetRef]]] = None,
predictedInputs: Optional[Mapping[DatasetType, List[DatasetRef]]] = None,
actualInputs: Optional[Mapping[DatasetType, List[DatasetRef]]] = None,
inputs: Optional[Mapping[DatasetType, List[DatasetRef]]] = None,
outputs: Optional[Mapping[DatasetType, List[DatasetRef]]] = None,
startTime: Optional[astropy.time.Time] = None,
endTime: Optional[astropy.time.Time] = None,
host: Optional[str] = None,
id: Optional[int] = None):
):
if taskClass is not None:
taskName = f"{taskClass.__module__}.{taskClass.__name__}"
self._taskName = taskName
self._taskClass = taskClass
self._run = run
self._dataId = dataId
if initInputs is None:
initInputs = {}
elif not isinstance(initInputs, Mapping):
initInputs = {ref.datasetType: ref for ref in initInputs}
if predictedInputs is None:
predictedInputs = {}
if actualInputs is None:
actualInputs = {}
if inputs is None:
inputs = {}
if outputs is None:
outputs = {}
self._initInputs: NamedKeyDict[DatasetType, DatasetRef] = NamedKeyDict(initInputs)
self._predictedInputs: NamedKeyDict[DatasetType, List[DatasetRef]] = NamedKeyDict(predictedInputs)
self._actualInputs: NamedKeyDict[DatasetType, List[DatasetRef]] = NamedKeyDict(actualInputs)
self._outputs: NamedKeyDict[DatasetType, List[DatasetRef]] = NamedKeyDict(outputs)
self._id = id
self._startTime = startTime
self._endTime = endTime
self._host = host
self._initInputs: NamedKeyMapping[DatasetType, DatasetRef] = NamedKeyDict(initInputs).freeze()
self._inputs: NamedKeyMapping[DatasetType, List[DatasetRef]] = NamedKeyDict(inputs).freeze()
self._outputs: NamedKeyMapping[DatasetType, List[DatasetRef]] = NamedKeyDict(outputs).freeze()

@property
def taskClass(self) -> Optional[Type]:
Expand All @@ -147,28 +116,22 @@ def taskName(self) -> Optional[str]:
"""
return self._taskName

@property
def run(self) -> Optional[str]:
"""The name of the run this Quantum is a part of (`str`).
"""
return self._run

@property
def dataId(self) -> Optional[DataCoordinate]:
"""The dimension values of the unit of processing (`DataId`).
"""
return self._dataId

@property
def initInputs(self) -> NamedKeyDict[DatasetType, DatasetRef]:
def initInputs(self) -> NamedKeyMapping[DatasetType, DatasetRef]:
"""A mapping of datasets used to construct the Task,
with `DatasetType` instances as keys (names can also be used for
lookups) and `DatasetRef` instances as values.
"""
return self._initInputs

@property
def predictedInputs(self) -> NamedKeyDict[DatasetType, List[DatasetRef]]:
def inputs(self) -> NamedKeyMapping[DatasetType, List[DatasetRef]]:
"""A mapping of input datasets that were expected to be used,
with `DatasetType` instances as keys (names can also be used for
lookups) and a list of `DatasetRef` instances as values.
Expand All @@ -179,23 +142,10 @@ def predictedInputs(self) -> NamedKeyDict[DatasetType, List[DatasetRef]]:
`DatasetRef` instances cannot be compared reliably when some have
integers IDs and others do not.
"""
return self._predictedInputs

@property
def actualInputs(self) -> NamedKeyDict[DatasetType, List[DatasetRef]]:
"""A mapping of input datasets that were actually used, with the same
form as `Quantum.predictedInputs`.

Notes
-----
We cannot use `set` instead of `list` for the nested container because
`DatasetRef` instances cannot be compared reliably when some have
integers IDs and others do not.
"""
return self._actualInputs
return self._inputs

@property
def outputs(self) -> NamedKeyDict[DatasetType, List[DatasetRef]]:
def outputs(self) -> NamedKeyMapping[DatasetType, List[DatasetRef]]:
"""A mapping of output datasets (to be) generated for this quantum,
with the same form as `predictedInputs`.

Expand All @@ -207,68 +157,29 @@ def outputs(self) -> NamedKeyDict[DatasetType, List[DatasetRef]]:
"""
return self._outputs

def addPredictedInput(self, ref: DatasetRef) -> None:
"""Add an input `DatasetRef` to the `Quantum`.

This does not automatically update a `Registry`; all `predictedInputs`
must be present before a `Registry.addQuantum()` is called.

Parameters
----------
ref : `DatasetRef`
Reference for a Dataset to add to the Quantum's predicted inputs.
"""
self._predictedInputs.setdefault(ref.datasetType, []).append(ref)

def _markInputUsed(self, ref: DatasetRef) -> None:
"""Mark an input as used.

This does not automatically update a `Registry`.
For that use `Registry.markInputUsed()` instead.
"""
# First validate against predicted
if ref.datasetType not in self._predictedInputs:
raise ValueError(f"Dataset type {ref.datasetType.name} not in predicted inputs")
if ref not in self._predictedInputs[ref.datasetType]:
raise ValueError(f"Actual input {ref} was not predicted")
# Now insert as actual
self._actualInputs.setdefault(ref.datasetType, []).append(ref)

def addOutput(self, ref: DatasetRef) -> None:
"""Add an output `DatasetRef` to the `Quantum`.

This does not automatically update a `Registry`; all `outputs`
must be present before a `Registry.addQuantum()` is called.

Parameters
----------
ref : `DatasetRef`
Reference for a Dataset to add to the Quantum's outputs.
"""
self._outputs.setdefault(ref.datasetType, []).append(ref)

@property
def id(self) -> Optional[int]:
"""Unique (autoincrement) integer for this quantum (`int`).
"""
return self._id

@property
def startTime(self) -> Optional[astropy.time.Time]:
"""Begin timestamp for the execution of this quantum
(`astropy.time.Time`).
"""
return self._startTime

@property
def endTime(self) -> Optional[astropy.time.Time]:
"""End timestamp for the execution of this quantum
(`astropy.time.Time`).
"""
return self._endTime

@property
def host(self) -> Optional[str]:
"""Name of the system on which this quantum was executed (`str`).
"""
return self._host
def __eq__(self, other: object) -> bool:
if not isinstance(other, Quantum):
return False
for item in ("taskClass", "dataId", "initInputs", "inputs", "outputs"):
if getattr(self, item) != getattr(other, item):
return False
return True

def __hash__(self) -> int:
return hash((self.taskClass, self.dataId))

def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
return (self._reduceFactory,
(self.taskName, self.taskClass, self.dataId, dict(self.initInputs.items()),
dict(self.inputs), dict(self.outputs)))

@staticmethod
def _reduceFactory(taskName: Optional[str],
taskClass: Optional[Type],
dataId: Optional[DataCoordinate],
initInputs: Optional[Union[Mapping[DatasetType, DatasetRef], Iterable[DatasetRef]]],
inputs: Optional[Mapping[DatasetType, List[DatasetRef]]],
outputs: Optional[Mapping[DatasetType, List[DatasetRef]]]
) -> Quantum:
return Quantum(taskName=taskName, taskClass=taskClass, dataId=dataId, initInputs=initInputs,
inputs=inputs, outputs=outputs)
61 changes: 13 additions & 48 deletions tests/test_quantum.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import unittest
import astropy.time

from lsst.daf.butler import Quantum, DimensionUniverse, NamedKeyDict, StorageClass, DatasetType, DatasetRef

Expand All @@ -36,62 +35,28 @@ def testConstructor(self):
"""Test of constructor.
"""
# Quantum specific arguments
run = None # TODO add Run
taskName = "some.task.object" # can't use a real PipelineTask due to inverted package dependency
# Base class arguments
startTime = astropy.time.Time("2018-01-01", format="iso", scale="utc")
endTime = astropy.time.Time("2018-01-02", format="iso", scale="utc")
host = "localhost"
quantum = Quantum(taskName=taskName, run=run, startTime=startTime, endTime=endTime, host=host)

quantum = Quantum(taskName=taskName)
self.assertEqual(quantum.taskName, taskName)
self.assertEqual(quantum.run, run)
self.assertEqual(quantum.predictedInputs, NamedKeyDict())
self.assertEqual(quantum.actualInputs, NamedKeyDict())
self.assertEqual(quantum.initInputs, {})
self.assertEqual(quantum.inputs, NamedKeyDict())
self.assertEqual(quantum.outputs, {})
self.assertIsNone(quantum.dataId)
self.assertIsNone(quantum.id)
self.assertEqual(quantum.startTime, startTime)
self.assertEqual(quantum.endTime, endTime)
self.assertEqual(quantum.host, host)

def testAddInputsOutputs(self):
"""Test of addPredictedInput() method.
"""
quantum = Quantum(taskName="some.task.object", run=None)

# start with empty
self.assertEqual(quantum.predictedInputs, dict())
universe = DimensionUniverse()
instrument = "DummyCam"
datasetTypeName = "test_ds"
storageClass = StorageClass("testref_StructuredData")
datasetType = DatasetType(datasetTypeName, universe.extract(("instrument", "visit")), storageClass)

# add one ref
ref = DatasetRef(datasetType, dict(instrument=instrument, visit=42))
quantum.addPredictedInput(ref)
self.assertIn(datasetTypeName, quantum.predictedInputs)
self.assertEqual(len(quantum.predictedInputs[datasetTypeName]), 1)
# add second ref
ref = DatasetRef(datasetType, dict(instrument=instrument, visit=43))
quantum.addPredictedInput(ref)
self.assertEqual(len(quantum.predictedInputs[datasetTypeName]), 2)

# mark last ref as actually used
self.assertEqual(quantum.actualInputs, dict())
quantum._markInputUsed(ref)
self.assertIn(datasetTypeName, quantum.actualInputs)
self.assertEqual(len(quantum.actualInputs[datasetTypeName]), 1)

# add couple of outputs too
self.assertEqual(quantum.outputs, dict())
ref = DatasetRef(datasetType, dict(instrument=instrument, visit=42))
quantum.addOutput(ref)
self.assertIn(datasetTypeName, quantum.outputs)
self.assertEqual(len(quantum.outputs[datasetTypeName]), 1)

ref = DatasetRef(datasetType, dict(instrument=instrument, visit=43))
quantum.addOutput(ref)
self.assertEqual(len(quantum.outputs[datasetTypeName]), 2)
predictedInputs = {datasetType: [DatasetRef(datasetType, dict(instrument=instrument, visit=42)),
DatasetRef(datasetType, dict(instrument=instrument, visit=43))]}
outputs = {datasetType: [DatasetRef(datasetType, dict(instrument=instrument, visit=42)),
DatasetRef(datasetType, dict(instrument=instrument, visit=43))]}

quantum = Quantum(taskName=taskName, inputs=predictedInputs, outputs=outputs)
self.assertEqual(len(quantum.inputs[datasetType]), 2)
self.assertEqual(len(quantum.outputs[datasetType]), 2)


if __name__ == "__main__":
Expand Down