Skip to content

Commit

Permalink
Remove UnresolvedRefWarning filters (DM-39122)
Browse files Browse the repository at this point in the history
Additionally PreExecInit is cleaned up to always expect resolved refs
in quantum graph and use those refs to store initOutputs. `CmdLineFwk`
now checks that output runs on command line and in graph are consistent.
  • Loading branch information
andy-slac committed May 13, 2023
1 parent 45d0adf commit f3d8804
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 235 deletions.
11 changes: 11 additions & 0 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,17 @@ def runPipeline(
Data Butler instance, if not defined then new instance is made
using command line options.
"""
# Check that output run defined on command line is consistent with
# quantum graph.
if args.output_run and graph.metadata:
graph_output_run = graph.metadata.get("output_run", args.output_run)
if graph_output_run != args.output_run:
raise ValueError(

Check warning on line 687 in python/lsst/ctrl/mpexec/cmdLineFwk.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/cmdLineFwk.py#L687

Added line #L687 was not covered by tests
f"Output run defined on command line ({args.output_run}) has to be "
f"identical to graph metadata ({graph_output_run}). "
"To update graph metadata run `pipetask update-graph-run` command."
)

# make sure that --extend-run always enables --skip-existing
if args.extend_run:
args.skip_existing = True
Expand Down
13 changes: 4 additions & 9 deletions python/lsst/ctrl/mpexec/mock_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import warnings
from typing import Any, List, Optional, Union

from lsst.daf.butler import Butler, DatasetRef, Quantum, UnresolvedRefWarning
from lsst.daf.butler import Butler, DatasetRef, Quantum
from lsst.pex.config import Field
from lsst.pipe.base import (
ButlerQuantumContext,
Expand Down Expand Up @@ -108,16 +107,12 @@ def _put(self, value: Any, ref: DatasetRef) -> None:
# docstring is inherited from the base class

mockDatasetType = self.registry.getDatasetType(self.mockDatasetTypeName(ref.datasetType.name))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
mockRef = DatasetRef(mockDatasetType, ref.dataId)
mockRef = DatasetRef(mockDatasetType, ref.dataId, run=ref.run)
value.setdefault("ref", {}).update(datasetType=mockDatasetType.name)
self.butler.put(value, mockRef)

# also "store" non-mock refs, make sure it is not resolved.
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
self.registry._importDatasets([ref.unresolved()])
# Also "store" non-mock refs.
self.registry._importDatasets([ref])

def _checkMembership(self, ref: Union[List[DatasetRef], DatasetRef], inout: set) -> None:
# docstring is inherited from the base class
Expand Down
8 changes: 1 addition & 7 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import sys
import threading
import time
import warnings
from collections.abc import Iterable
from enum import Enum
from typing import Literal, Optional

from lsst.daf.butler import UnresolvedRefWarning
from lsst.daf.butler.cli.cliLog import CliLog
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base.graph.graph import QuantumGraph, QuantumNode
Expand Down Expand Up @@ -158,11 +156,7 @@ def _executeJob(
# re-initialize logging
CliLog.replayConfigState(logConfigState)

with warnings.catch_warnings():
# Loading the pickle file can trigger unresolved ref warnings
# so we must hide them.
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
quantum = pickle.loads(quantum_pickle)
quantum = pickle.loads(quantum_pickle)
try:
quantumExecutor.execute(taskDef, quantum)
finally:
Expand Down
224 changes: 69 additions & 155 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# -----------------------------
# Imports for other modules --
# -----------------------------
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler import DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.utils.packages import Packages
Expand Down Expand Up @@ -83,9 +83,10 @@ class PreExecInitBase(abc.ABC):
depend on Butler type.
"""

def __init__(self, butler: LimitedButler, taskFactory: TaskFactory):
def __init__(self, butler: LimitedButler, taskFactory: TaskFactory, extendRun: bool):
self.butler = butler
self.taskFactory = taskFactory
self.extendRun = extendRun

def initialize(
self,
Expand Down Expand Up @@ -166,14 +167,15 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None:
new data.
"""
_LOG.debug("Will save InitOutputs for all tasks")
for taskDef in graph.iterTaskGraph():
init_input_refs = self.find_init_input_refs(taskDef, graph)
for taskDef in self._task_iter(graph):
init_input_refs = graph.initInputRefs(taskDef) or []
task = self.taskFactory.makeTask(taskDef, self.butler, init_input_refs)
for name in taskDef.connections.initOutputs:
attribute = getattr(taskDef.connections, name)
obj_from_store, init_output_ref = self.find_init_output(taskDef, attribute.name, graph)
init_output_refs = graph.initOutputRefs(taskDef) or []
init_output_ref, obj_from_store = self._find_dataset(init_output_refs, attribute.name)
if init_output_ref is None:
raise ValueError(f"Cannot find or make dataset reference for init output {name}")
raise ValueError(f"Cannot find dataset reference for init output {name} in a graph")

Check warning on line 178 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L178

Added line #L178 was not covered by tests
init_output_var = getattr(task, name)

if obj_from_store is not None:
Expand Down Expand Up @@ -220,10 +222,16 @@ def logConfigMismatch(msg: str) -> None:
_LOG.debug("Will save Configs for all tasks")
# start transaction to rollback any changes on exceptions
with self.transaction():
for taskDef in graph.iterTaskGraph():
config_name = taskDef.configDatasetName
for taskDef in self._task_iter(graph):
# Config dataset ref is stored in task init outputs, but it
# may be also be missing.
task_output_refs = graph.initOutputRefs(taskDef)
if task_output_refs is None:
continue

Check warning on line 230 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L230

Added line #L230 was not covered by tests

old_config, dataset_ref = self.find_init_output(taskDef, taskDef.configDatasetName, graph)
config_ref, old_config = self._find_dataset(task_output_refs, taskDef.configDatasetName)
if config_ref is None:
continue

Check warning on line 234 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L234

Added line #L234 was not covered by tests

if old_config is not None:
if not taskDef.config.compare(old_config, shortcut=False, output=logConfigMismatch):
Expand All @@ -232,9 +240,10 @@ def logConfigMismatch(msg: str) -> None:
"butler; tasks configurations must be consistent within the same run collection"
)
else:
# butler will raise exception if dataset is already there
_LOG.debug("Saving Config for task=%s dataset type=%s", taskDef.label, config_name)
self.butler.put(taskDef.config, dataset_ref)
_LOG.debug(
"Saving Config for task=%s dataset type=%s", taskDef.label, taskDef.configDatasetName
)
self.butler.put(taskDef.config, config_ref)

def savePackageVersions(self, graph: QuantumGraph) -> None:
"""Write versions of software packages to butler.
Expand All @@ -254,7 +263,14 @@ def savePackageVersions(self, graph: QuantumGraph) -> None:

# start transaction to rollback any changes on exceptions
with self.transaction():
old_packages, dataset_ref = self.find_packages(graph)
# Packages dataset ref is stored in task init outputs, but it
# may be also be missing.

packages_ref, old_packages = self._find_dataset(
graph.globalInitOutputRefs(), PipelineDatasetTypes.packagesDatasetName
)
if packages_ref is None:
return

Check warning on line 273 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L273

Added line #L273 was not covered by tests

if old_packages is not None:
# Note that because we can only detect python modules that have
Expand All @@ -270,81 +286,55 @@ def savePackageVersions(self, graph: QuantumGraph) -> None:
old_packages.update(packages)
# have to remove existing dataset first, butler has no
# replace option.
self.butler.pruneDatasets([dataset_ref], unstore=True, purge=True)
self.butler.put(old_packages, dataset_ref)
self.butler.pruneDatasets([packages_ref], unstore=True, purge=True)
self.butler.put(old_packages, packages_ref)

Check warning on line 290 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L289-L290

Added lines #L289 - L290 were not covered by tests
else:
self.butler.put(packages, dataset_ref)
self.butler.put(packages, packages_ref)

@abc.abstractmethod
def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
"""Return the list of resolved dataset references for task init inputs.
def _find_dataset(
self, refs: Iterable[DatasetRef], dataset_type: str
) -> tuple[DatasetRef | None, Any | None]:
"""Find a ref with a given dataset type name in a list of references
and try to retrieve its data from butler.
Parameters
----------
taskDef : `~lsst.pipe.base.TaskDef`
Pipeline task definition.
graph : `~lsst.pipe.base.QuantumGraph`
Quantum graph.
Returns
-------
refs : `~collections.abc.Iterable` [`~lsst.daf.butler.DatasetRef`]
Resolved dataset references.
"""
raise NotImplementedError()

@abc.abstractmethod
def find_init_output(
self, taskDef: TaskDef, dataset_type: str, graph: QuantumGraph
) -> tuple[Any | None, DatasetRef]:
"""Find task init output for given dataset type.
Parameters
----------
taskDef : `~lsst.pipe.base.TaskDef`
Pipeline task definition.
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
References to check for matching dataset type.
dataset_type : `str`
Dataset type name.
graph : `~lsst.pipe.base.QuantumGraph`
Quantum graph.
Name of a dtaset type to look for.
Returns
-------
data
Existing init output object retrieved from butler, `None` if butler
has no existing object.
ref : `~lsst.daf.butler.DatasetRef`
Resolved reference for init output to be stored in butler.
Raises
------
MissingReferenceError
Raised if reference cannot be found or generated.
ref : `DatasetRef` or `None`
Dataset reference or `None` if there is no matching dataset type.
data : `Any`
An existing object extracted from butler, `None` if ``ref`` is
`None` or if there is no existing object for that reference.
"""
raise NotImplementedError()

@abc.abstractmethod
def find_packages(self, graph: QuantumGraph) -> tuple[Packages | None, DatasetRef]:
"""Find packages information.
Parameters
----------
graph : `~lsst.pipe.base.QuantumGraph`
Quantum graph.
Returns
-------
packages : `lsst.utils.packages.Packages` or `None`
Existing packages data retrieved from butler, or `None`.
ref : `~lsst.daf.butler.DatasetRef`
Resolved reference for packages to be stored in butler.
Raises
------
MissingReferenceError
Raised if reference cannot be found or generated.
ref: DatasetRef | None = None
for ref in refs:
if ref.datasetType.name == dataset_type:
break
else:
return None, None

Check warning on line 320 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L320

Added line #L320 was not covered by tests

try:
data = self.butler.get(ref)
if data is not None and not self.extendRun:
# It must not exist unless we are extending run.
raise ConflictingDefinitionError(f"Dataset {ref} already exists in butler")
except (LookupError, FileNotFoundError):
data = None
return ref, data

def _task_iter(self, graph: QuantumGraph) -> Iterator[TaskDef]:
"""Iterate over TaskDefs in a graph, return only tasks that have one or
more associated quanta.
"""
raise NotImplementedError()
for taskDef in graph.iterTaskGraph():
if graph.getNumberOfQuantaForTask(taskDef) > 0:
yield taskDef

@contextmanager
def transaction(self) -> Iterator[None]:
Expand Down Expand Up @@ -377,9 +367,8 @@ class PreExecInit(PreExecInitBase):
"""

def __init__(self, butler: Butler, taskFactory: TaskFactory, extendRun: bool = False, mock: bool = False):
super().__init__(butler, taskFactory)
super().__init__(butler, taskFactory, extendRun)
self.full_butler = butler
self.extendRun = extendRun
self.mock = mock
if self.extendRun and self.full_butler.run is None:
raise RuntimeError(
Expand Down Expand Up @@ -487,54 +476,6 @@ def _check_compatibility(datasetType: DatasetType, expected: DatasetType, is_inp
"passing `--register-dataset-types` option to `pipetask run`."
)

def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
# docstring inherited
refs: list[DatasetRef] = []
for name in taskDef.connections.initInputs:
attribute = getattr(taskDef.connections, name)
dataId = DataCoordinate.makeEmpty(self.full_butler.dimensions)
dataset_type = DatasetType(attribute.name, graph.universe.empty, attribute.storageClass)
ref = self.full_butler.registry.findDataset(dataset_type, dataId)
if ref is None:
raise ValueError(f"InitInput does not exist in butler for dataset type {dataset_type}")
refs.append(ref)
return refs

def find_init_output(
self, taskDef: TaskDef, dataset_type_name: str, graph: QuantumGraph
) -> tuple[Any | None, DatasetRef]:
# docstring inherited
dataset_type = self.full_butler.registry.getDatasetType(dataset_type_name)
dataId = DataCoordinate.makeEmpty(self.full_butler.dimensions)
return self._find_existing(dataset_type, dataId)

def find_packages(self, graph: QuantumGraph) -> tuple[Packages | None, DatasetRef]:
# docstring inherited
dataset_type = self.full_butler.registry.getDatasetType(PipelineDatasetTypes.packagesDatasetName)
dataId = DataCoordinate.makeEmpty(self.full_butler.dimensions)
return self._find_existing(dataset_type, dataId)

def _find_existing(
self, dataset_type: DatasetType, dataId: DataCoordinate
) -> tuple[Any | None, DatasetRef]:
"""Make a reference of a given dataset type and try to retrieve it from
butler. If not found then generate new resolved reference.
"""
run = self.full_butler.run
assert run is not None

ref = self.full_butler.registry.findDataset(dataset_type, dataId, collections=[run])
if self.extendRun and ref is not None:
try:
config = self.butler.get(ref)
return config, ref
except (LookupError, FileNotFoundError):
return None, ref
else:
# make new resolved dataset ref
ref = DatasetRef(dataset_type, dataId, run=run)
return None, ref


class PreExecInitLimited(PreExecInitBase):
"""Initialization of registry for QuantumGraph execution.
Expand All @@ -551,36 +492,9 @@ class PreExecInitLimited(PreExecInitBase):
"""

def __init__(self, butler: LimitedButler, taskFactory: TaskFactory):
super().__init__(butler, taskFactory)
super().__init__(butler, taskFactory, False)

Check warning on line 495 in python/lsst/ctrl/mpexec/preExecInit.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/mpexec/preExecInit.py#L495

Added line #L495 was not covered by tests

def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None:
# docstring inherited
# With LimitedButler we never create or check dataset types.
pass

def find_init_input_refs(self, taskDef: TaskDef, graph: QuantumGraph) -> Iterable[DatasetRef]:
# docstring inherited
return graph.initInputRefs(taskDef) or []

def find_init_output(
self, taskDef: TaskDef, dataset_type: str, graph: QuantumGraph
) -> tuple[Any | None, DatasetRef]:
# docstring inherited
return self._find_existing(graph.initOutputRefs(taskDef) or [], dataset_type)

def find_packages(self, graph: QuantumGraph) -> tuple[Packages | None, DatasetRef]:
# docstring inherited
return self._find_existing(graph.globalInitOutputRefs(), PipelineDatasetTypes.packagesDatasetName)

def _find_existing(self, refs: Iterable[DatasetRef], dataset_type: str) -> tuple[Any | None, DatasetRef]:
"""Find a reference of a given dataset type in the list of references
and try to retrieve it from butler.
"""
for ref in refs:
if ref.datasetType.name == dataset_type:
try:
data = self.butler.get(ref)
return data, ref
except (LookupError, FileNotFoundError):
return None, ref
raise MissingReferenceError(f"Failed to find reference for dataset type {dataset_type}")

0 comments on commit f3d8804

Please sign in to comment.