Skip to content

Commit

Permalink
Merge pull request #287 from lsst/tickets/DM-40441
Browse files Browse the repository at this point in the history
DM-40441: first batch of deprecations from RFC-949
  • Loading branch information
TallJimbo committed Apr 8, 2024
2 parents 716b37d + fab8c00 commit d04c916
Show file tree
Hide file tree
Showing 20 changed files with 641 additions and 414 deletions.
2 changes: 1 addition & 1 deletion python/lsst/ctrl/mpexec/cli/script/qgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import logging
from types import SimpleNamespace

from lsst.pipe.base.graphBuilder import DatasetQueryConstraintVariant
from lsst.pipe.base.all_dimensions_quantum_graph_builder import DatasetQueryConstraintVariant

from ... import CmdLineFwk

Expand Down
70 changes: 37 additions & 33 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import getpass
import logging
import shutil
from collections.abc import Iterable, Mapping, Sequence
from collections.abc import Mapping, Sequence
from types import SimpleNamespace

import astropy.units as u
Expand All @@ -50,7 +50,6 @@
CollectionType,
Config,
DatasetId,
DatasetRef,
DatasetType,
DimensionUniverse,
LimitedButler,
Expand All @@ -65,16 +64,17 @@
from lsst.daf.butler.registry.wildcards import CollectionWildcard
from lsst.pipe.base import (
ExecutionResources,
GraphBuilder,
Instrument,
Pipeline,
PipelineDatasetTypes,
PipelineGraph,
QuantumGraph,
TaskDef,
TaskFactory,
buildExecutionButler,
)
from lsst.pipe.base.all_dimensions_quantum_graph_builder import AllDimensionsQuantumGraphBuilder
from lsst.pipe.base.pipeline_graph import NodeType
from lsst.utils import doImportType
from lsst.utils.logging import getLogger
from lsst.utils.threads import disable_implicit_threading

from .dotTools import graph2dot, pipeline2dot
Expand All @@ -87,7 +87,7 @@
# Local non-exported definitions --
# ----------------------------------

_LOG = logging.getLogger(__name__)
_LOG = getLogger(__name__)


class _OutputChainedCollectionInfo:
Expand Down Expand Up @@ -415,7 +415,7 @@ def defineDatastoreCache() -> None:
_LOG.debug("Defining shared datastore cache directory to %s", cache_dir)

@classmethod
def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | None = None) -> Butler:
def makeWriteButler(cls, args: SimpleNamespace, pipeline_graph: PipelineGraph | None = None) -> Butler:
"""Return a read-write butler initialized to write to and read from
the collections specified by the given command-line arguments.
Expand All @@ -424,7 +424,7 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | No
args : `types.SimpleNamespace`
Parsed command-line arguments. See class documentation for the
construction parameter of the same name.
taskDefs : iterable of `TaskDef`, optional
pipeline_graph : `lsst.pipe.base.PipelineGraph`, optional
Definitions for tasks in a pipeline. This argument is only needed
if ``args.replace_run`` is `True` and ``args.prune_replaced`` is
"unstore".
Expand All @@ -446,12 +446,17 @@ def makeWriteButler(cls, args: SimpleNamespace, taskDefs: Iterable[TaskDef] | No
if args.prune_replaced == "unstore":
# Remove datasets from datastore
with butler.transaction():
refs: Iterable[DatasetRef] = butler.registry.queryDatasets(..., collections=replaced)
# we want to remove regular outputs but keep
# initOutputs, configs, and versions.
if taskDefs is not None:
initDatasetNames = set(PipelineDatasetTypes.initOutputNames(taskDefs))
refs = [ref for ref in refs if ref.datasetType.name not in initDatasetNames]
# we want to remove regular outputs from this pipeline,
# but keep initOutputs, configs, and versions.
if pipeline_graph is not None:
refs = [
ref
for ref in butler.registry.queryDatasets(..., collections=replaced)
if (
(producer := pipeline_graph.producer_of(ref.datasetType.name)) is not None
and producer.key.node_type is NodeType.TASK # i.e. not TASK_INIT
)
]
butler.pruneDatasets(refs, unstore=True, disassociate=False)
elif args.prune_replaced == "purge":
# Erase entire collection and all datasets, need to remove
Expand Down Expand Up @@ -618,21 +623,25 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
if args.show_qgraph_header:
print(QuantumGraph.readHeader(args.qgraph))
else:
task_defs = list(pipeline.toExpandedPipeline())
pipeline_graph = pipeline.to_graph()
if args.mock:
from lsst.pipe.base.tests.mocks import mock_task_defs
from lsst.pipe.base.tests.mocks import mock_pipeline_graph

task_defs = mock_task_defs(
task_defs,
pipeline_graph = mock_pipeline_graph(
pipeline_graph,
unmocked_dataset_types=args.unmocked_dataset_types,
force_failures=args.mock_failure,
)
# make execution plan (a.k.a. DAG) for pipeline
graphBuilder = GraphBuilder(
butler.registry,
skipExistingIn=args.skip_existing_in,
clobberOutputs=args.clobber_outputs,
datastore=butler._datastore if args.qgraph_datastore_records else None,
graph_builder = AllDimensionsQuantumGraphBuilder(
pipeline_graph,
butler,
where=args.data_query,
skip_existing_in=args.skip_existing_in if args.skip_existing_in is not None else (),
clobber=args.clobber_outputs,
dataset_query_constraint=args.dataset_query_constraint,
input_collections=collections,
output_run=run,
)
# accumulate metadata
metadata = {
Expand All @@ -648,15 +657,7 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
"time": f"{datetime.datetime.now()}",
}
assert run is not None, "Butler output run collection must be defined"
qgraph = graphBuilder.makeGraph(
task_defs,
collections,
run,
args.data_query,
metadata=metadata,
datasetQueryConstraint=args.dataset_query_constraint,
dataId=pipeline.get_data_id(butler.dimensions),
)
qgraph = graph_builder.build(metadata, attach_datastore_records=args.qgraph_datastore_records)
if args.show_qgraph_header:
qgraph.buildAndPrintHeader()

Expand All @@ -666,6 +667,7 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
self._summarize_qgraph(qgraph)

if args.save_qgraph:
_LOG.verbose("Writing QuantumGraph to %r.", args.save_qgraph)
qgraph.saveUri(args.save_qgraph)

if args.save_single_quanta:
Expand All @@ -675,9 +677,11 @@ def makeGraph(self, pipeline: Pipeline, args: SimpleNamespace) -> QuantumGraph |
sqgraph.saveUri(uri)

if args.qgraph_dot:
_LOG.verbose("Writing quantum graph DOT visualization to %r.", args.qgraph_dot)
graph2dot(qgraph, args.qgraph_dot)

if args.execution_butler_location:
_LOG.verbose("Writing execution butler to %r.", args.execution_butler_location)
butler = Butler.from_config(args.butler_config)
assert isinstance(butler, DirectButler), "Execution butler needs DirectButler"
newArgs = copy.deepcopy(args)
Expand Down Expand Up @@ -775,7 +779,7 @@ def runPipeline(
# Make butler instance. QuantumGraph should have an output run defined,
# but we ignore it here and let command line decide actual output run.
if butler is None:
butler = _ButlerFactory.makeWriteButler(args, graph.iterTaskGraph())
butler = _ButlerFactory.makeWriteButler(args, graph.pipeline_graph)

if args.skip_existing:
args.skip_existing_in += (butler.run,)
Expand Down
7 changes: 6 additions & 1 deletion python/lsst/ctrl/mpexec/dotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import html
import io
import re
import warnings
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -280,7 +281,11 @@ def expand_dimensions(connection: connectionTypes.BaseConnection) -> list[str]:

allDatasets: set[str | tuple[str, str]] = set()
if isinstance(pipeline, Pipeline):
pipeline = pipeline.toExpandedPipeline()
# TODO: DM-40639 will rewrite this code and finish off the deprecation
# of toExpandedPipeline.
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=FutureWarning)
pipeline = pipeline.toExpandedPipeline()

# The next two lines are a workaround until DM-29658 at which time metadata
# connections should start working with the above code
Expand Down
39 changes: 30 additions & 9 deletions python/lsst/ctrl/mpexec/log_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import os
import shutil
import tempfile
import warnings
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from logging import FileHandler

from lsst.daf.butler import Butler, FileDataset, LimitedButler, Quantum
from lsst.daf.butler.logging import ButlerLogRecordHandler, ButlerLogRecords, ButlerMDC, JsonLogFormatter
from lsst.pipe.base import InvalidQuantumError, TaskDef
from lsst.pipe.base.pipeline_graph import TaskNode
from lsst.utils.introspection import find_outside_stacklevel

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,13 +88,17 @@ def from_full(cls, butler: Butler) -> LogCapture:
return cls(butler, butler)

@contextmanager
def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCaptureFlag]:
def capture_logging(
self, task_node: TaskDef | TaskNode, /, quantum: Quantum
) -> Iterator[_LogCaptureFlag]:
"""Configure logging system to capture logs for execution of this task.
Parameters
----------
taskDef : `lsst.pipe.base.TaskDef`
The task definition.
task_node : `lsst.pipe.base.TaskDef` or \
`~lsst.pipe.base.pipeline_graph.TaskNode`
The task definition. Support for `~lsst.pipe.base.TaskDef` is
deprecated and will be removed after v27.
quantum : `~lsst.daf.butler.Quantum`
Single Quantum instance.
Expand All @@ -103,23 +110,37 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
.. code-block:: py
with self.capture_logging(taskDef, quantum):
with self.capture_logging(task_node, quantum):
# Run quantum and capture logs.
Ths method can also setup logging to attach task- or
quantum-specific information to log messages. Potentially this can
take into account some info from task configuration as well.
"""
# include quantum dataId and task label into MDC
mdc = {"LABEL": taskDef.label, "RUN": ""}
mdc = {"LABEL": task_node.label, "RUN": ""}
if quantum.dataId:
mdc["LABEL"] += f":{quantum.dataId}"
if self.full_butler is not None:
mdc["RUN"] = self.full_butler.run or ""
ctx = _LogCaptureFlag()

if isinstance(task_node, TaskDef):
# TODO: remove this block and associated docs and annotations on
# DM-40443.
log_dataset_name = task_node.logOutputDatasetName
warnings.warn(
"Passing TaskDef instances to LogCapture is deprecated and will not be supported after v27.",
FutureWarning,
find_outside_stacklevel("lsst.ctrl.mpexec"),
)
else:
log_dataset_name = (
task_node.log_output.dataset_type_name if task_node.log_output is not None else None
)

# Add a handler to the root logger to capture execution log output.
if taskDef.logOutputDatasetName is not None:
if log_dataset_name is not None:
# Either accumulate into ButlerLogRecords or stream JSON records to
# file and ingest that (ingest is possible only with full butler).
if self.stream_json_logs and self.full_butler is not None:
Expand All @@ -132,7 +153,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
tmpdir = tempfile.mkdtemp(prefix="butler-temp-logs-")

# Construct a file to receive the log records and "touch" it.
log_file = os.path.join(tmpdir, f"butler-log-{taskDef.label}.json")
log_file = os.path.join(tmpdir, f"butler-log-{task_node.label}.json")
with open(log_file, "w"):
pass
log_handler_file = FileHandler(log_file)
Expand All @@ -147,7 +168,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
logging.getLogger().removeHandler(log_handler_file)
log_handler_file.close()
if ctx.store:
self._ingest_log_records(quantum, taskDef.logOutputDatasetName, log_file)
self._ingest_log_records(quantum, log_dataset_name, log_file)
shutil.rmtree(tmpdir, ignore_errors=True)

else:
Expand All @@ -161,7 +182,7 @@ def capture_logging(self, taskDef: TaskDef, quantum: Quantum) -> Iterator[_LogCa
# Ensure that the logs are stored in butler.
logging.getLogger().removeHandler(log_handler_memory)
if ctx.store:
self._store_log_records(quantum, taskDef.logOutputDatasetName, log_handler_memory)
self._store_log_records(quantum, log_dataset_name, log_handler_memory)
log_handler_memory.records.clear()

else:
Expand Down

0 comments on commit d04c916

Please sign in to comment.