Skip to content

Commit

Permalink
Merge pull request #232 from lsst/tickets/DM-37703
Browse files Browse the repository at this point in the history
DM-37703: Catch unresolved refs warnings and some workarounds
  • Loading branch information
timj committed Apr 19, 2023
2 parents bb2d212 + e3c9915 commit 3a2e5ed
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 16 deletions.
15 changes: 10 additions & 5 deletions python/lsst/ctrl/mpexec/mock_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import warnings
from typing import Any, List, Optional, Union

from lsst.daf.butler import Butler, DatasetRef, Quantum
from lsst.daf.butler import Butler, DatasetRef, Quantum, UnresolvedRefWarning
from lsst.pex.config import Field
from lsst.pipe.base import (
ButlerQuantumContext,
Expand Down Expand Up @@ -84,9 +85,9 @@ def _get(self, ref: Optional[Union[DeferredDatasetRef, DatasetRef]]) -> Any:
mockDatasetTypeName = self.mockDatasetTypeName(datasetType.name)

try:
# Try to use the mock DatasetType if it is defined.
mockDatasetType = self.butler.registry.getDatasetType(mockDatasetTypeName)
ref = DatasetRef(mockDatasetType, ref.dataId)
data = self.butler.get(ref)
data = self.butler.get(mockDatasetType, ref.dataId)
except KeyError:
data = super()._get(ref)
# If the input as an actual non-mock data then we want to replace
Expand All @@ -107,12 +108,16 @@ def _put(self, value: Any, ref: DatasetRef) -> None:
# docstring is inherited from the base class

mockDatasetType = self.registry.getDatasetType(self.mockDatasetTypeName(ref.datasetType.name))
mockRef = DatasetRef(mockDatasetType, ref.dataId)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
mockRef = DatasetRef(mockDatasetType, ref.dataId)
value.setdefault("ref", {}).update(datasetType=mockDatasetType.name)
self.butler.put(value, mockRef)

# also "store" non-mock refs, make sure it is not resolved.
self.registry._importDatasets([ref.unresolved()])
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
self.registry._importDatasets([ref.unresolved()])

def _checkMembership(self, ref: Union[List[DatasetRef], DatasetRef], inout: set) -> None:
# docstring is inherited from the base class
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import sys
import threading
import time
import warnings
from collections.abc import Iterable
from enum import Enum
from typing import Literal, Optional

from lsst.daf.butler import UnresolvedRefWarning
from lsst.daf.butler.cli.cliLog import CliLog
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base.graph.graph import QuantumGraph, QuantumNode
Expand Down Expand Up @@ -156,7 +158,11 @@ def _executeJob(
# re-initialize logging
CliLog.replayConfigState(logConfigState)

quantum = pickle.loads(quantum_pickle)
with warnings.catch_warnings():
# Loading the pickle file can trigger unresolved ref warnings
# so we must hide them.
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
quantum = pickle.loads(quantum_pickle)
try:
quantumExecutor.execute(taskDef, quantum)
finally:
Expand Down
5 changes: 2 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# -----------------------------
# Imports for other modules --
# -----------------------------
from lsst.daf.butler import DataCoordinate, DatasetIdFactory, DatasetRef, DatasetType
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.utils.packages import Packages
Expand Down Expand Up @@ -532,8 +532,7 @@ def _find_existing(
return None, ref
else:
# make new resolved dataset ref
ref = DatasetRef(dataset_type, dataId)
ref = DatasetIdFactory().resolveRef(ref, run)
ref = DatasetRef(dataset_type, dataId, run=run)
return None, ref


Expand Down
29 changes: 23 additions & 6 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@
import os
import sys
import time
import warnings
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
from typing import Any, Optional, Union

from lsst.daf.butler import Butler, DatasetRef, DatasetType, LimitedButler, NamedKeyDict, Quantum
from lsst.daf.butler import (
Butler,
DatasetRef,
DatasetType,
LimitedButler,
NamedKeyDict,
Quantum,
UnresolvedRefWarning,
)
from lsst.pipe.base import (
AdjustQuantumHelper,
ButlerQuantumContext,
Expand Down Expand Up @@ -171,7 +180,9 @@ def _resolve_ref(self, ref: DatasetRef, collections: Any = None) -> DatasetRef |
if self.butler is not None:
# If running with full butler, need to re-resolve it in case
# collections are different.
ref = ref.unresolved()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = ref.unresolved()
return self.butler.registry.findDataset(ref.datasetType, ref.dataId, collections=collections)
else:
# In case of QBB all refs must be resolved already, do not check.
Expand Down Expand Up @@ -485,11 +496,15 @@ def updatedQuantumInputs(
if self.butler.datastore.exists(resolvedRef):
newRefsForDatasetType.append(resolvedRef)
else:
mockRef = DatasetRef(mockDatasetType, ref.dataId)
resolvedMockRef = self.butler.registry.findDataset(
mockRef.datasetType, mockRef.dataId, collections=self.butler.collections
mockDatasetType, ref.dataId, collections=self.butler.collections
)
_LOG.debug(
"mockRef=(%s, %s) resolvedMockRef=%s",
mockDatasetType,
ref.dataId,
resolvedMockRef,
)
_LOG.debug("mockRef=%s resolvedMockRef=%s", mockRef, resolvedMockRef)
if resolvedMockRef is not None and self.butler.datastore.exists(resolvedMockRef):
_LOG.debug("resolvedMockRef dataset exists")
newRefsForDatasetType.append(resolvedRef)
Expand Down Expand Up @@ -587,7 +602,9 @@ def writeMetadata(
# have to ignore that because may be overriding run
# collection.
if ref.id is not None:
ref = ref.unresolved()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = ref.unresolved()
self.butler.put(metadata, ref)
else:
limited_butler.put(metadata, ref)
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ def _makeQGraph():
taskName=_TASK_CLASS,
inputs={
fakeDSType: [
DatasetRef(fakeDSType, DataCoordinate.standardize({"A": 1, "B": 2}, universe=universe))
DatasetRef(
fakeDSType,
DataCoordinate.standardize({"A": 1, "B": 2}, universe=universe),
run="fake_run",
)
]
},
)
Expand Down

0 comments on commit 3a2e5ed

Please sign in to comment.