Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39031: Use resolved references for HiPS graph #784

Merged
merged 1 commit into from
May 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 52 additions & 8 deletions python/lsst/pipe/tasks/hips.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

from lsst.sphgeom import RangeSet, HealpixPixelization
from lsst.utils.timer import timeMethod
from lsst.daf.butler import Butler, DatasetRef, Quantum, SkyPixDimension, UnresolvedRefWarning
from lsst.daf.butler import Butler, DatasetRef, Quantum, SkyPixDimension
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.afw.geom as afwGeom
Expand Down Expand Up @@ -358,15 +358,32 @@ def build_quantum_graph_cli(cls, argv):
elif args.subparser_name == "build":
# Build the quantum graph.

# Figure out collection names.
if args.output_run is None:
if args.output is None:
raise ValueError("At least one of --output or --output-run options is required.")
args.output_run = "{}/{}".format(args.output, pipeBase.Instrument.makeCollectionTimestamp())

build_ranges = RangeSet(sorted(args.pixels))

# Metadata includes a subset of attributes defined in CmdLineFwk.
metadata = {
"input": args.input,
"butler_argument": args.butler_config,
"output": args.output,
"output_run": args.output_run,
"data_query": args.where,
"time": f"{datetime.now()}",
}

qg = cls.build_quantum_graph(
task_def,
butler.registry,
args.hpix_build_order,
build_ranges,
where=args.where,
collections=args.input
collections=args.input,
metadata=metadata,
)
qg.saveUri(args.save_qgraph)

Expand Down Expand Up @@ -429,6 +446,28 @@ def _make_cli_parser(cls):
help="Data ID expression used when querying for input coadd datasets.",
)

parser_build.add_argument(
"--output",
type=str,
help=(
"Name of the output CHAINED collection. If this options is specified and "
"--output-run is not, then a new RUN collection will be created by appending "
"a timestamp to the value of this option."
),
default=None,
metavar="COLL",
)
parser_build.add_argument(
"--output-run",
type=str,
help=(
"Output RUN collection to write resulting images. If not provided "
"then --output must be provided and a new RUN collection will be created "
"by appending a timestamp to the value passed with --output."
),
default=None,
metavar="RUN",
)
parser_build.add_argument(
"-q",
"--save-qgraph",
Expand Down Expand Up @@ -456,6 +495,7 @@ def build_quantum_graph(
constraint_ranges,
where=None,
collections=None,
metadata=None,
andy-slac marked this conversation as resolved.
Show resolved Hide resolved
):
"""Generate a `QuantumGraph` for running just this task.

Expand Down Expand Up @@ -489,6 +529,9 @@ def build_quantum_graph(
Collection or collections to search for input datasets, in order.
If not provided, ``registry.defaults.collections`` will be
searched.
metadata : `dict` [ `str`, `Any` ]
Graph metadata. It is required to contain "output_run" key with the
name of the output RUN collection.
"""
config = task_def.config

Expand Down Expand Up @@ -598,11 +641,12 @@ def build_quantum_graph(
output_data_ids.append(
registry.expandDataId({hpx_output_dimension: hpx_output_index, "band": band})
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UnresolvedRefWarning)
outputs = {dt: [DatasetRef(dt, data_id)] for dt in incidental_output_dataset_types}
outputs[output_dataset_type] = [DatasetRef(output_dataset_type, data_id)
for data_id in output_data_ids]
output_run = metadata["output_run"]
outputs = {
dt: [DatasetRef(dt, data_id, run=output_run)] for dt in incidental_output_dataset_types
}
outputs[output_dataset_type] = [DatasetRef(output_dataset_type, data_id, run=output_run)
for data_id in output_data_ids]
quanta.append(
Quantum(
taskName=task_def.taskName,
Expand All @@ -617,7 +661,7 @@ def build_quantum_graph(
if len(quanta) == 0:
raise RuntimeError("Given constraints yielded empty quantum graph.")

return pipeBase.QuantumGraph(quanta={task_def: quanta})
return pipeBase.QuantumGraph(quanta={task_def: quanta}, metadata=metadata)


class HipsPropertiesSpectralTerm(pexConfig.Config):
Expand Down