Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM- 26629: switch to calibration collections instead of the calibration_label dimension #148

Merged
merged 5 commits into from
Sep 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/lsst/pipe/base/butlerQuantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ def __init__(self, butler: Butler, quantum: Quantum):
def _get(self, ref):
if isinstance(ref, DeferredDatasetRef):
self._checkMembership(ref.datasetRef, self.allInputs)
return butler.getDeferred(ref.datasetRef)
return butler.getDirectDeferred(ref.datasetRef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not supper comfortable linking this to something that MUST have been done by some other class. At minimum you should throw an execption here if ref.id is none (or in the constructor), and add something to the documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDirectDeferred will raise an exception if the ref.id is None.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks to Tim's comment on the daf_butler PR. As we discussed out-of-band, I'm also uncomfortable with this relying on logic in ctrl_mpexec, but I think the problem is where that code lives, not what it does or guarantees. But in addition to that check for not-None ID (which we can defer to butler), I'll go add some documentation to the ButlerQuantumContext class docs and anywhere else I can find that resolved DatasetRefs are a precondition.


else:
self._checkMembership(ref, self.allInputs)
return butler.get(ref)
return butler.getDirect(ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDirect already gets upset if it's not a resolved ref.


def _put(self, value, ref):
self._checkMembership(ref, self.allOutputs)
Expand Down
36 changes: 34 additions & 2 deletions python/lsst/pipe/base/connectionTypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DatasetType,
DimensionUniverse,
Registry,
StorageClass,
)


Expand Down Expand Up @@ -92,6 +93,27 @@ def __get__(self, inst, klass):
# information provided by the connection class instance
return inst._connectionCache.setdefault(idSelf, self.__class__(**params))

def makeDatasetType(self, universe: DimensionUniverse,
parentStorageClass: Optional[StorageClass] = None):
"""Construct a true `DatasetType` instance with normalized dimensions.
Parameters
----------
universe : `lsst.daf.butler.DimensionUniverse`
Set of all known dimensions to be used to normalize the dimension
names specified in config.
parentStorageClass : `lsst.daf.butler.StorageClass`, optional
Parent storage class for component datasets; `None` otherwise.

Returns
-------
datasetType : `DatasetType`
The `DatasetType` defined by this connection.
"""
return DatasetType(self.name,
universe.empty,
self.storageClass,
parentStorageClass=parentStorageClass)


@dataclasses.dataclass(frozen=True)
class DimensionedConnection(BaseConnection):
Expand All @@ -110,8 +132,13 @@ class DimensionedConnection(BaseConnection):
dimensions : iterable of `str`
The `lsst.daf.butler.Butler` `lsst.daf.butler.Registry` dimensions used
to identify the dataset type identified by the specified name
isCalibration: `bool`, optional
`True` if this dataset type may be included in CALIBRATION-type
collections to associate it with a validity range, `False` (default)
otherwise.
"""
dimensions: typing.Iterable[str] = ()
isCalibration: bool = False

def __post_init__(self):
if isinstance(self.dimensions, str):
Expand All @@ -120,21 +147,26 @@ def __post_init__(self):
if not isinstance(self.dimensions, typing.Iterable):
raise TypeError("Dimensions must be iterable of dimensions")

def makeDatasetType(self, universe: DimensionUniverse):
def makeDatasetType(self, universe: DimensionUniverse,
parentStorageClass: Optional[StorageClass] = None):
"""Construct a true `DatasetType` instance with normalized dimensions.
Parameters
----------
universe : `lsst.daf.butler.DimensionUniverse`
Set of all known dimensions to be used to normalize the dimension
names specified in config.
parentStorageClass : `lsst.daf.butler.StorageClass`, optional
Parent storage class for component datasets; `None` otherwise.

Returns
-------
datasetType : `DatasetType`
The `DatasetType` defined by this connection.
"""
return DatasetType(self.name,
universe.extract(self.dimensions),
self.storageClass)
self.storageClass, isCalibration=self.isCalibration,
parentStorageClass=parentStorageClass)


@dataclasses.dataclass(frozen=True)
Expand Down
25 changes: 20 additions & 5 deletions python/lsst/pipe/base/graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,22 +707,37 @@ def resolveDatasetRefs(self, registry, collections, run, commonDataIds, *, skipE
# These may have dimensions that extend beyond those we queried
# for originally, because we want to permit those data ID
# values to differ across quanta and dataset types.
# For example, the same quantum may have a flat and bias with
# a different calibration_label, or a refcat with a skypix
# value that overlaps the quantum's data ID's region, but not
# the user expression used for the initial query.
for datasetType in task.prerequisites:
lookupFunction = lookupFunctions.get(datasetType.name)
if lookupFunction is not None:
# PipelineTask has provided its own function to do the
# lookup. This always takes precedence.
refs = list(
lookupFunction(datasetType, registry, quantum.dataId, collections)
)
elif (datasetType.isCalibration()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only comment about this block is that I had always intended to go back and make the else a free function so it was easier to use in a custom lookupFunction that wanted to extend that behavior. Perhaps you could do that and the same for the calibration bits if you think it is worth it. Its not too much code to copy for someone, and its use will be small so if you dont think its a good use of time I am fine with you not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that's a good idea, but this ticket is too big and cumbersome for any more scope, and I definitely plan to be revisit this block in the next month anyway.

and datasetType.dimensions <= quantum.dataId.graph
and quantum.dataId.graph.temporal):
# This is a master calibration lookup, which we have to
# handle specially because the query system can't do a
# temporal join on a non-dimension-based timespan yet.
timespan = quantum.dataId.timespan
try:
refs = [registry.findDataset(datasetType, quantum.dataId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be sure that this will only ever return one, is an exception raised if more than one could be found for a timespan? Does the certification process prevent this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, findDataset will raise in that case. The certification process is supposed to guarantee that is impossible for any calibration I can think of, but it's not something we can guarantee at the level of database constraints. So the exception basically says, "someone probably put together a malformed calibration repo".

collections=collections,
timespan=timespan)]
except KeyError:
# This dataset type is not present in the registry,
# which just means there are no datasets here.
refs = []
else:
# Most general case.
refs = list(registry.queryDatasets(datasetType,
collections=collections,
dataId=quantum.dataId,
deduplicate=True).expanded())
quantum.prerequisites[datasetType].update({ref.dataId: ref for ref in refs})
quantum.prerequisites[datasetType].update({ref.dataId: ref for ref in refs
if ref is not None})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If my ticket lands first, this is broken, infact... looking at this makes me feel I need to go look back at my ticket...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, this is a QuantumScaffolding

# Actually remove any quanta that we decided to skip above.
if dataIdsToSkip:
_LOG.debug("Pruning %d quanta for task with label '%s' because all of their outputs exist.",
Expand Down
14 changes: 8 additions & 6 deletions python/lsst/pipe/base/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,14 +508,16 @@ def makeDatasetTypesSet(connectionType, freeze=True):
compositeName, componentName = DatasetType.splitDatasetTypeName(c.name)
parentStorageClass = DatasetType.PlaceholderParentStorageClass \
if componentName else None
datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
c.storageClass,
parentStorageClass=parentStorageClass)
datasetType = c.makeDatasetType(
registry.dimensions,
parentStorageClass=parentStorageClass
)
registryDatasetType = datasetType
else:
datasetType = DatasetType(c.name, registry.dimensions.extract(dimensions),
c.storageClass,
parentStorageClass=registryDatasetType.parentStorageClass)
datasetType = c.makeDatasetType(
registry.dimensions,
parentStorageClass=registryDatasetType.parentStorageClass
)

if registryDatasetType and datasetType != registryDatasetType:
raise ValueError(f"Supplied dataset type ({datasetType}) inconsistent with "
Expand Down
36 changes: 36 additions & 0 deletions python/lsst/pipe/base/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,41 @@ def _refFromConnection(butler, connection, dataId, **kwargs):
from e


def _resolveTestQuantumInputs(butler, quantum):
"""Look up all input datasets a test quantum in the `Registry` to resolve
all `DatasetRef` objects (i.e. ensure they have not-`None` ``id`` and
``run`` attributes).

Parameters
----------
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.
butler : `~lsst.daf.butler.Butler`
Data butler.
"""
# TODO (DM-26819): This function is a direct copy of
# `lsst.ctrl.mpexec.SingleQuantumExecutor.updateQuantumInputs`, but the
# `runTestQuantum` function that calls it is essentially duplicating logic
# in that class as well (albeit not verbatim). We should probably move
# `SingleQuantumExecutor` to ``pipe_base`` and see if it is directly usable
# in test code instead of having these classes at all.
for refsForDatasetType in quantum.predictedInputs.values():
newRefsForDatasetType = []
for ref in refsForDatasetType:
if ref.id is None:
resolvedRef = butler.registry.findDataset(ref.datasetType, ref.dataId,
collections=butler.collections)
if resolvedRef is None:
raise ValueError(
f"Cannot find {ref.datasetType.name} with id {ref.dataId} "
f"in collections {butler.collections}."
)
newRefsForDatasetType.append(resolvedRef)
else:
newRefsForDatasetType.append(ref)
refsForDatasetType[:] = newRefsForDatasetType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes too with my ticket, the race is on, maybe I should have held back these comments? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least (I imagine) it'll be an easy change in either case.



def runTestQuantum(task, butler, quantum, mockRun=True):
"""Run a PipelineTask on a Quantum.

Expand All @@ -185,6 +220,7 @@ def runTestQuantum(task, butler, quantum, mockRun=True):
If ``mockRun`` is set, the mock that replaced ``run``. This object can
be queried for the arguments ``runQuantum`` passed to ``run``.
"""
_resolveTestQuantumInputs(butler, quantum)
butlerQc = ButlerQuantumContext(butler, quantum)
connections = task.config.ConnectionsClass(config=task.config)
inputRefs, outputRefs = connections.buildDatasetRefs(quantum)
Expand Down
12 changes: 3 additions & 9 deletions tests/test_pipelineTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@ def __init__(self):
self.datasets = {}
self.registry = SimpleNamespace(dimensions=DimensionUniverse())

def get(self, datasetRefOrType, dataId=None):
if isinstance(datasetRefOrType, DatasetRef):
dataId = datasetRefOrType.dataId
dsTypeName = datasetRefOrType.datasetType.name
else:
dsTypeName = datasetRefOrType
key = dataId
dsdata = self.datasets.get(dsTypeName)
def getDirect(self, ref):
dsdata = self.datasets.get(ref.datasetType.name)
if dsdata:
return dsdata.get(key)
return dsdata.get(ref.dataId)
return None

def put(self, inMemoryDataset, dsRef, producer=None):
Expand Down