Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-37703: Catch unresolved refs warnings and some workarounds #232

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 10 additions & 5 deletions python/lsst/ctrl/mpexec/mock_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import warnings
from typing import Any, List, Optional, Union

from lsst.daf.butler import Butler, DatasetRef, Quantum
from lsst.daf.butler import Butler, DatasetRef, Quantum, UnresolvedRefWarning
from lsst.pex.config import Field
from lsst.pipe.base import (
ButlerQuantumContext,
Expand Down Expand Up @@ -84,9 +85,9 @@ def _get(self, ref: Optional[Union[DeferredDatasetRef, DatasetRef]]) -> Any:
mockDatasetTypeName = self.mockDatasetTypeName(datasetType.name)

try:
# Try to use the mock DatasetType if it is defined.
mockDatasetType = self.butler.registry.getDatasetType(mockDatasetTypeName)
ref = DatasetRef(mockDatasetType, ref.dataId)
data = self.butler.get(ref)
data = self.butler.get(mockDatasetType, ref.dataId)
except KeyError:
data = super()._get(ref)
# If the input as an actual non-mock data then we want to replace
Expand All @@ -107,12 +108,16 @@ def _put(self, value: Any, ref: DatasetRef) -> None:
# docstring is inherited from the base class

mockDatasetType = self.registry.getDatasetType(self.mockDatasetTypeName(ref.datasetType.name))
mockRef = DatasetRef(mockDatasetType, ref.dataId)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
mockRef = DatasetRef(mockDatasetType, ref.dataId)
value.setdefault("ref", {}).update(datasetType=mockDatasetType.name)
self.butler.put(value, mockRef)

# also "store" non-mock refs, make sure it is not resolved.
self.registry._importDatasets([ref.unresolved()])
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
self.registry._importDatasets([ref.unresolved()])

def _checkMembership(self, ref: Union[List[DatasetRef], DatasetRef], inout: set) -> None:
# docstring is inherited from the base class
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import sys
import threading
import time
import warnings
from collections.abc import Iterable
from enum import Enum
from typing import Literal, Optional

from lsst.daf.butler import UnresolvedRefWarning
from lsst.daf.butler.cli.cliLog import CliLog
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base.graph.graph import QuantumGraph, QuantumNode
Expand Down Expand Up @@ -156,7 +158,11 @@ def _executeJob(
# re-initialize logging
CliLog.replayConfigState(logConfigState)

quantum = pickle.loads(quantum_pickle)
with warnings.catch_warnings():
# Loading the pickle file can trigger unresolved ref warnings
# so we must hide them.
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
quantum = pickle.loads(quantum_pickle)
try:
quantumExecutor.execute(taskDef, quantum)
finally:
Expand Down
5 changes: 2 additions & 3 deletions python/lsst/ctrl/mpexec/preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# -----------------------------
# Imports for other modules --
# -----------------------------
from lsst.daf.butler import DataCoordinate, DatasetIdFactory, DatasetRef, DatasetType
from lsst.daf.butler import DataCoordinate, DatasetRef, DatasetType
from lsst.daf.butler.registry import ConflictingDefinitionError
from lsst.pipe.base import PipelineDatasetTypes
from lsst.utils.packages import Packages
Expand Down Expand Up @@ -532,8 +532,7 @@ def _find_existing(
return None, ref
else:
# make new resolved dataset ref
ref = DatasetRef(dataset_type, dataId)
ref = DatasetIdFactory().resolveRef(ref, run)
ref = DatasetRef(dataset_type, dataId, run=run)
return None, ref


Expand Down
29 changes: 23 additions & 6 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@
import os
import sys
import time
import warnings
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
from typing import Any, Optional, Union

from lsst.daf.butler import Butler, DatasetRef, DatasetType, LimitedButler, NamedKeyDict, Quantum
from lsst.daf.butler import (
Butler,
DatasetRef,
DatasetType,
LimitedButler,
NamedKeyDict,
Quantum,
UnresolvedRefWarning,
)
from lsst.pipe.base import (
AdjustQuantumHelper,
ButlerQuantumContext,
Expand Down Expand Up @@ -171,7 +180,9 @@ def _resolve_ref(self, ref: DatasetRef, collections: Any = None) -> DatasetRef |
if self.butler is not None:
# If running with full butler, need to re-resolve it in case
# collections are different.
ref = ref.unresolved()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = ref.unresolved()
return self.butler.registry.findDataset(ref.datasetType, ref.dataId, collections=collections)
else:
# In case of QBB all refs must be resolved already, do not check.
Expand Down Expand Up @@ -485,11 +496,15 @@ def updatedQuantumInputs(
if self.butler.datastore.exists(resolvedRef):
newRefsForDatasetType.append(resolvedRef)
else:
mockRef = DatasetRef(mockDatasetType, ref.dataId)
resolvedMockRef = self.butler.registry.findDataset(
mockRef.datasetType, mockRef.dataId, collections=self.butler.collections
mockDatasetType, ref.dataId, collections=self.butler.collections
)
_LOG.debug(
"mockRef=(%s, %s) resolvedMockRef=%s",
mockDatasetType,
ref.dataId,
resolvedMockRef,
)
_LOG.debug("mockRef=%s resolvedMockRef=%s", mockRef, resolvedMockRef)
if resolvedMockRef is not None and self.butler.datastore.exists(resolvedMockRef):
_LOG.debug("resolvedMockRef dataset exists")
newRefsForDatasetType.append(resolvedRef)
Expand Down Expand Up @@ -587,7 +602,9 @@ def writeMetadata(
# have to ignore that because may be overriding run
# collection.
if ref.id is not None:
ref = ref.unresolved()
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
ref = ref.unresolved()
self.butler.put(metadata, ref)
else:
limited_butler.put(metadata, ref)
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ def _makeQGraph():
taskName=_TASK_CLASS,
inputs={
fakeDSType: [
DatasetRef(fakeDSType, DataCoordinate.standardize({"A": 1, "B": 2}, universe=universe))
DatasetRef(
fakeDSType,
DataCoordinate.standardize({"A": 1, "B": 2}, universe=universe),
run="fake_run",
)
]
},
)
Expand Down