Skip to content

Commit

Permalink
Add InitOutputs to generated quantum graph (DM-39122)
Browse files Browse the repository at this point in the history
Quantum graph generated by HiPS task now includes references for task
config datasets in per-task InitOutputs and a reference for packages
dataset in global init outputs.
  • Loading branch information
andy-slac committed May 16, 2023
1 parent 5e700ac commit 23ab85d
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions python/lsst/pipe/tasks/hips.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

from lsst.sphgeom import RangeSet, HealpixPixelization
from lsst.utils.timer import timeMethod
from lsst.daf.butler import Butler, DatasetRef, Quantum, SkyPixDimension
from lsst.daf.butler import Butler, DataCoordinate, DatasetRef, Quantum, SkyPixDimension
import lsst.pex.config as pexConfig
import lsst.pipe.base as pipeBase
import lsst.afw.geom as afwGeom
Expand Down Expand Up @@ -624,6 +624,7 @@ def build_quantum_graph(
inputs_by_hpx[hpx_index].update(input_refs_for_patch)
# Iterate over the dict we just created and create the actual quanta.
quanta = []
output_run = metadata["output_run"]
for hpx_index, input_refs_for_hpx_index in inputs_by_hpx.items():
# Group inputs by band.
input_refs_by_band = defaultdict(list)
Expand All @@ -641,7 +642,6 @@ def build_quantum_graph(
output_data_ids.append(
registry.expandDataId({hpx_output_dimension: hpx_output_index, "band": band})
)
output_run = metadata["output_run"]
outputs = {
dt: [DatasetRef(dt, data_id, run=output_run)] for dt in incidental_output_dataset_types
}
Expand All @@ -661,7 +661,22 @@ def build_quantum_graph(
if len(quanta) == 0:
raise RuntimeError("Given constraints yielded empty quantum graph.")

return pipeBase.QuantumGraph(quanta={task_def: quanta}, metadata=metadata)
# Define initOutputs refs.
empty_data_id = DataCoordinate.makeEmpty(registry.dimensions)
init_outputs = {}
global_init_outputs = []
if config_dataset_type := dataset_types.initOutputs.get(task_def.configDatasetName):
init_outputs[task_def] = [DatasetRef(config_dataset_type, empty_data_id, run=output_run)]
packages_dataset_name = pipeBase.PipelineDatasetTypes.packagesDatasetName
if packages_dataset_type := dataset_types.initOutputs.get(packages_dataset_name):
global_init_outputs.append(DatasetRef(packages_dataset_type, empty_data_id, run=output_run))

return pipeBase.QuantumGraph(
quanta={task_def: quanta},
initOutputs=init_outputs,
globalInitOutputs=global_init_outputs,
metadata=metadata,
)


class HipsPropertiesSpectralTerm(pexConfig.Config):
Expand Down

0 comments on commit 23ab85d

Please sign in to comment.