Skip to content

Commit

Permalink
Use resolved references for HiPS graph (DM-39031)
Browse files Browse the repository at this point in the history
This commit adds two command line options to the graph builder script
which define output collection names, similarly to `pipetask qgraph`
command. Output run is used to resolve references when creating quantum
graph for HiPS task. At least one of the `--output` or `--output-runs`
options must be provided when invoking the script.
  • Loading branch information
andy-slac committed May 9, 2023
1 parent cb98b56 commit dc0f4bb
Showing 1 changed file with 52 additions and 8 deletions.
60 changes: 52 additions & 8 deletions python/lsst/pipe/tasks/hips.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

from lsst.sphgeom import RangeSet, HealpixPixelization
from lsst.utils.timer import timeMethod
from lsst.daf.butler import Butler, DatasetRef, Quantum, SkyPixDimension, UnresolvedRefWarning
from lsst.daf.butler import Butler, DatasetRef, Quantum, SkyPixDimension
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.afw.geom as afwGeom
Expand Down Expand Up @@ -358,15 +358,33 @@ def build_quantum_graph_cli(cls, argv):
elif args.subparser_name == "build":
# Build the quantum graph.

# Figure out collection names.
if args.output_run is None:
if args.output is None:
raise ValueError("At least one of --output or --output-run options is required.")
args.output_run = "{}/{}".format(args.output, pipeBase.Instrument.makeCollectionTimestamp())

build_ranges = RangeSet(sorted(args.pixels))

# Metadata includes a subset of attributes defined in CmdLineFwk.
metadata = {
"input": args.input,
"butler_argument": args.butler_config,
"output": args.output,
"output_run": args.output_run,
"data_query": args.where,
"time": f"{datetime.now()}",
}

qg = cls.build_quantum_graph(
task_def,
butler.registry,
args.hpix_build_order,
build_ranges,
output_run=args.output_run,
where=args.where,
collections=args.input
collections=args.input,
metadata=metadata,
)
qg.saveUri(args.save_qgraph)

Expand Down Expand Up @@ -429,6 +447,24 @@ def _make_cli_parser(cls):
help="Data ID expression used when querying for input coadd datasets.",
)

parser_build.add_argument(
"--output",
type=str,
help="Name of the output CHAINED collection.",
default=None,
metavar="COLL",
)
parser_build.add_argument(
"--output-run",
type=str,
help=(
"Output RUN collection to write resulting images. If not provided "
"then --output must be provided and a new RUN collection will be created "
"by appending a timestamp to the value passed with --output."
),
default=None,
metavar="RUN",
)
parser_build.add_argument(
"-q",
"--save-qgraph",
Expand All @@ -454,8 +490,10 @@ def build_quantum_graph(
registry,
constraint_order,
constraint_ranges,
output_run,
where=None,
collections=None,
metadata=None,
):
"""Generate a `QuantumGraph` for running just this task.
Expand All @@ -477,6 +515,10 @@ def build_quantum_graph(
constraint_ranges : `lsst.sphgeom.RangeSet`
RangeSet which describes constraint pixels (HEALPix NEST, with order
constraint_order) to constrain generated quanta.
output : `str`, optional
Name of the output CHAINED collection, may be `None`.
output_run : `str`
Name of the output RUN collection.
where : `str`, optional
A boolean `str` expression of the form accepted by
`Registry.queryDatasets` to constrain input datasets. This may
Expand All @@ -489,6 +531,8 @@ def build_quantum_graph(
Collection or collections to search for input datasets, in order.
If not provided, ``registry.defaults.collections`` will be
searched.
butler_config : `str`, optional
Butler configuration URI to store in graph metadata.
"""
config = task_def.config

Expand Down Expand Up @@ -598,11 +642,11 @@ def build_quantum_graph(
output_data_ids.append(
registry.expandDataId({hpx_output_dimension: hpx_output_index, "band": band})
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
outputs = {dt: [DatasetRef(dt, data_id)] for dt in incidental_output_dataset_types}
outputs[output_dataset_type] = [DatasetRef(output_dataset_type, data_id)
for data_id in output_data_ids]
outputs = {
dt: [DatasetRef(dt, data_id, run=output_run)] for dt in incidental_output_dataset_types
}
outputs[output_dataset_type] = [DatasetRef(output_dataset_type, data_id, run=output_run)
for data_id in output_data_ids]
quanta.append(
Quantum(
taskName=task_def.taskName,
Expand All @@ -617,7 +661,7 @@ def build_quantum_graph(
if len(quanta) == 0:
raise RuntimeError("Given constraints yielded empty quantum graph.")

return pipeBase.QuantumGraph(quanta={task_def: quanta})
return pipeBase.QuantumGraph(quanta={task_def: quanta}, metadata=metadata)


class HipsPropertiesSpectralTerm(pexConfig.Config):
Expand Down

0 comments on commit dc0f4bb

Please sign in to comment.