Skip to content

Commit

Permalink
Merge pull request #36 from lsst/tickets/DM-40849
Browse files Browse the repository at this point in the history
DM-40849: Use SeparablePipelineExecutor to fix collection behavior
  • Loading branch information
mfisherlevine committed Sep 26, 2023
2 parents 79216fc + 02c7f75 commit e0642bc
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions python/lsst/atmospec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import sys
import lsst.afw.math as afwMath
import lsst.afw.image as afwImage
from lsst.ctrl.mpexec import SimplePipelineExecutor
from lsst.ctrl.mpexec import SeparablePipelineExecutor
import lsst.afw.geom as afwGeom
import lsst.geom as geom
import lsst.daf.butler as dafButler
from lsst.daf.butler.registry import RegistryDefaults

from astro_metadata_translator import ObservationInfo
import lsst.pex.config as pexConfig
from lsst.pipe.base import Pipeline
Expand Down Expand Up @@ -633,10 +635,13 @@ def makeQuery(dataId):
if extraInputCollections is not None:
extraInputCollections = ensure_iterable(extraInputCollections)
inputs.extend(extraInputCollections)
butler = SimplePipelineExecutor.prep_butler(repo,
inputs=inputs,
output=outputCollection)

butler = dafButler.Butler(repo, writeable=True, collections=inputs)

butler.registry.registerCollection(outputCollection, dafButler.CollectionType.CHAINED)
run = outputCollection + '/run'
butler.registry.defaults = RegistryDefaults(collections=outputCollection, run=run)
butler.registry.setCollectionChain(outputCollection, [run] + inputs)
pipeline = Pipeline.fromFile("${ATMOSPEC_DIR}/pipelines/processStar.yaml")

for taskName, configClass in taskConfigs.items():
Expand All @@ -658,18 +663,14 @@ def makeQuery(dataId):
pipeline.addConfigOverride(taskName, option, value)

query = makeQuery(dataId)
executor = SimplePipelineExecutor.from_pipeline(pipeline,
where=query,
root=repo,
butler=butler)
executor = SeparablePipelineExecutor(butler, clobber_output=True)

quantumGraph = executor.make_quantum_graph(pipeline, where=query)
executor.pre_execute_qgraph(quantumGraph, save_versions=False)

logging.basicConfig(level=logging.INFO, stream=sys.stdout)
quanta = executor.run()

# quanta is just a plain list, but the items know their names, so rather
# than just taking the third item and relying on that being the position in
# the pipeline we get the item by name
processStarQuantum = [q for q in quanta if q.taskName == 'lsst.atmospec.processStar.ProcessStarTask'][0]
dataRef = processStarQuantum.outputs['spectractorSpectrum'][0]
result = butler.get(dataRef)
executor.run_pipeline(quantumGraph, fail_fast=True)

butler.registry.refresh()
result = butler.get('spectractorSpectrum', dataId)
return result

0 comments on commit e0642bc

Please sign in to comment.