Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-19988: support code (and removal of dead code) for QuantumGraph generation rewrite #169

Merged
merged 14 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions config/dimensions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ dimensions:
lhs: [exposure]
rhs: [calibration_label]

visit_calibration_label_join:
doc: >
A many-to-many join table that relates visit to calibration_label.

This can and probably should be implemented as a view.
lhs: [visit]
rhs: [calibration_label]
summarizes: [exposure_calibration_label_join]

visit_detector_region:
doc: >
A many-to-many join table that provides region information for
Expand Down
1 change: 0 additions & 1 deletion config/registry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ registry:
cls: lsst.daf.butler.registries.sqliteRegistry.SqliteRegistry
db: 'sqlite:///:memory:'
limited: false
deferDatasetIdQueries: true
skypix:
cls: lsst.sphgeom.HtmPixelization
level: 7
Expand Down
60 changes: 60 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,66 @@ schema:
);
materialize: false

visit_calibration_label_join:
doc: >
A many-to-many join table that relates visit to calibration_label.

This can and probably should be implemented as a view.
columns:
-
name: instrument
type: string
length: 16
primary_key: true
nullable: false
doc: >
Name of the instrument associated with these master calibration
products.
-
name: calibration_label
type: string
length: 128
primary_key: true
nullable: false
doc: label used to identify a group of master calibration products.
-
name: visit
type: int
primary_key: true
nullable: false
doc: >
Unique (with instrument) integer identifier for a visit.
foreignKeys:
-
src: instrument
tgt: instrument.instrument
-
src:
- instrument
- visit
tgt:
- visit.instrument
- visit.visit
-
src:
- instrument
- calibration_label
tgt:
- calibration_label.instrument
- calibration_label.calibration_label
sql: >
SELECT DISTINCT
visit.instrument, visit.visit, calibration_label.calibration_label
FROM
visit INNER JOIN calibration_label ON (
visit.instrument = calibration_label.instrument
AND
visit.datetime_begin >= calibration_label.valid_first
AND
visit.datetime_end <= calibration_label.valid_last
);
materialize: false

visit_detector_region:
doc: >
A one-to-many table join table that stores the spatial region
Expand Down
20 changes: 20 additions & 0 deletions python/lsst/daf/butler/core/dimensions/dataId.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,23 @@ def __getnewargs_ex__(self):
args = (None,)
kwargs = dict(dimensions=self.dimensions())
return (args, kwargs)

def matches(self, other):
"""Compare two data IDs with possibly differing dimensions.

Parameters
----------
other : `DataId` or `dict`
Other data ID to compare to.

Returns
-------
relationship : `bool` or `None`
`True` if all of the keys ``self`` and ``other`` have in common
have the same values; `False` if one or more do not have the same
values, and `None` if there are no common keys.
"""
keys = self.keys() & other.keys()
if not keys:
return None
return all(self[k] == other[k] for k in keys)
9 changes: 9 additions & 0 deletions python/lsst/daf/butler/core/dimensions/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,19 @@ def __init__(self):
super().__init__(universe=self, elements=elements, dimensions=dimensions, joins=joins)
self._backrefs = {}
self._subgraphCache = {}
self._empty = None

def __repr__(self):
return f"DimensionUniverse({self._dimensions}, joins={self._joins})"

@property
def empty(self):
"""Return an empty graph in this universe (`DimensionGraph`).
"""
if self._empty is None:
self._empty = self.extract()
return self._empty

def extract(self, dimensions=(), joins=(), implied=False):
r"""Return a new graph containing the given elements.

Expand Down
152 changes: 105 additions & 47 deletions python/lsst/daf/butler/core/quantum.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,49 +21,88 @@

__all__ = ("Quantum",)

from .utils import slotValuesAreEqual
from lsst.utils import doImport

from .utils import NamedKeyDict
from .execution import Execution


class Quantum(Execution):
"""A discrete unit of work that may depend on one or more datasets and
produces one or more datasets.

Most Quanta will be executions of a particular `SuperTask`’s `runQuantum`
method, but they can also be used to represent discrete units of work
performed manually by human operators or other software agents.
Most Quanta will be executions of a particular ``PipelineTask``’s
``runQuantum`` method, but they can also be used to represent discrete
units of work performed manually by human operators or other software
agents.

Parameters
----------
task : `str` or `SuperTask`
Fully-qualified name of the SuperTask that executed this Quantum.
run : `Run`
taskName : `str`, optional
Fully-qualified name of the Task class that executed or will execute
this Quantum. If not provided, ``taskClass`` must be.
taskClass : `type`, optional
The Task class that executed or will execute this Quantum. If not
provided, ``taskName`` must be. Overrides ``taskName`` if both are
provided.
dataId : `DataId`, optional
The dimension values that identify this `Quantum`.
run : `Run`, optional
The Run this Quantum is a part of.
initInputs : collection of `DatasetRef`, optional
Datasets that are needed to construct an instance of the Task. May
be a flat iterable of `DatasetRef` instances or a mapping from
`DatasetType` to `DatasetRef`.
predictedInputs : `~collections.abc.Mapping`, optional
Inputs identified prior to execution, organized as a mapping from
`DatasetType` to a list of `DatasetRef`. Must be a superset of
``actualInputs``.
actualInputs : `~collections.abc.Mapping`, optional
Inputs actually used during execution, organized as a mapping from
`DatasetType` to a list of `DatasetRef`. Must be a subset of
``predictedInputs``.
outputs : `~collections.abc.Mapping`, optional
Outputs from executing this quantum of work, organized as a mapping
from `DatasetType` to a list of `DatasetRef`.
kwargs
Additional arguments are forwarded to the base `Execution` constructor.
"""

__slots__ = ("_task", "_run", "_predictedInputs", "_actualInputs", "_outputs")
__eq__ = slotValuesAreEqual

def __init__(self, task, run, *args, **kwargs):
super().__init__(*args, **kwargs)
self._task = task
__slots__ = ("_taskName", "_taskClass", "_dataId", "_run",
"_initInputs", "_predictedInputs", "_actualInputs", "_outputs")

def __init__(self, *, taskName=None, taskClass=None, dataId=None, run=None,
initInputs=None, predictedInputs=(), actualInputs=(), outputs=(),
**kwargs):
super().__init__(**kwargs)
if taskClass is not None:
taskName = f"{taskClass.__module__}.{taskClass.__name__}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everyone seem to prefer f-strings for simple string operations, I still like old fashioned

taskClass.__module__ + "." + taskClass.__name__

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I do like the f-string version better. In this case there isn't much of a difference, but I like using the same pattern for simple and more complex cases that seem to be part of the same family of patterns, and I think this qualifies.

self._taskName = taskName
self._taskClass = taskClass
self._run = run
self._predictedInputs = {}
self._actualInputs = {}
self._outputs = {}
self._dataId = dataId
if initInputs is None:
initInputs = {}
elif not hasattr(initInputs, "keys"):
initInputs = {ref.datasetType: ref for ref in initInputs}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be that there is more than one ref in initInputs with the same dataset type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the PipelineTask declarations already assume all initInput and initOutput dataset types are scalar (and they have to be because they have empty data IDs).

self._initInputs = NamedKeyDict(initInputs)
self._predictedInputs = NamedKeyDict(predictedInputs)
self._actualInputs = NamedKeyDict(actualInputs)
self._outputs = NamedKeyDict(outputs)

@property
def taskClass(self):
"""Task class associated with this `Quantum` (`type`).
"""
if self._taskClass is None:
self._taskClass = doImport(self._taskName)
return self._taskClass

@property
def task(self):
"""Task associated with this `Quantum`.

If the `Quantum` is associated with a `SuperTask`, this is the
`SuperTask` instance that produced and should execute this set of
inputs and outputs. If not, a human-readable string identifier
for the operation. Some Registries may permit the value to be
`None`, but are not required to in general.
def taskName(self):
"""Fully-qualified name of the task associated with `Quantum` (`str`).
"""
return self._task
return self._taskName

@property
def run(self):
Expand All @@ -72,34 +111,56 @@ def run(self):
return self._run

@property
def predictedInputs(self):
r"""A `dict` of input datasets that were expected to be used,
with `DatasetType` names as keys and a list of `DatasetRef` instances
as values.
def dataId(self):
"""The dimension values of the unit of processing (`DataId`).
"""
return self._dataId

Input `Datasets` that have already been stored may be
`DatasetRef`\ s, and in many contexts may be guaranteed to be.
Read-only; update via `Quantum.addPredictedInput()`.
@property
def initInputs(self):
"""A mapping of datasets used to construct the Task,
with `DatasetType` instances as keys (names can also be used for
lookups) and `DatasetRef` instances as values.
"""
return self._initInputs

@property
def predictedInputs(self):
"""A mapping of input datasets that were expected to be used,
with `DatasetType` instances as keys (names can also be used for
lookups) and a list of `DatasetRef` instances as values.

Notes
-----
We cannot use `set` instead of `list` for the nested container because
`DatasetRef` instances cannot be compared reliably when some have
integers IDs and others do not.
"""
return self._predictedInputs

@property
def actualInputs(self):
"""A `dict` of input datasets that were actually used, with the same
"""A mapping of input datasets that were actually used, with the same
form as `Quantum.predictedInputs`.

All returned sets must be subsets of those in `predictedInputs`.

Read-only; update via `Registry.markInputUsed()`.
Notes
-----
We cannot use `set` instead of `list` for the nested container because
`DatasetRef` instances cannot be compared reliably when some have
integers IDs and others do not.
"""
return self._actualInputs

@property
def outputs(self):
"""A `dict` of output datasets (to be) generated for this quantum,
"""A mapping of output datasets (to be) generated for this quantum,
with the same form as `predictedInputs`.

Read-only; update via `addOutput()`.
Notes
-----
We cannot use `set` instead of `list` for the nested container because
`DatasetRef` instances cannot be compared reliably when some have
integers IDs and others do not.
"""
return self._outputs

Expand All @@ -114,23 +175,21 @@ def addPredictedInput(self, ref):
ref : `DatasetRef`
Reference for a Dataset to add to the Quantum's predicted inputs.
"""
datasetTypeName = ref.datasetType.name
self._predictedInputs.setdefault(datasetTypeName, []).append(ref)
self._predictedInputs.setdefault(ref.datasetType, []).append(ref)

def _markInputUsed(self, ref):
"""Mark an input as used.

This does not automatically update a `Registry`.
For that use `Registry.markInputUsed()` instead.
"""
datasetTypeName = ref.datasetType.name
# First validate against predicted
if datasetTypeName not in self._predictedInputs:
raise ValueError("Dataset type {} not in predicted inputs".format(datasetTypeName))
if ref not in self._predictedInputs[datasetTypeName]:
if ref.datasetType not in self._predictedInputs:
raise ValueError("Dataset type {} not in predicted inputs".format(ref.datasetType.name))
if ref not in self._predictedInputs[ref.datasetType]:
raise ValueError("Actual input {} was not predicted".format(ref))
# Now insert as actual
self._actualInputs.setdefault(datasetTypeName, []).append(ref)
self._actualInputs.setdefault(ref.datasetType, []).append(ref)

def addOutput(self, ref):
"""Add an output `DatasetRef` to the `Quantum`.
Expand All @@ -143,5 +202,4 @@ def addOutput(self, ref):
ref : `DatasetRef`
Reference for a Dataset to add to the Quantum's outputs.
"""
datasetTypeName = ref.datasetType.name
self._outputs.setdefault(datasetTypeName, []).append(ref)
self._outputs.setdefault(ref.datasetType, []).append(ref)