Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40849: Use SeparablePipelineExecutor to fix collection behavior #36

Merged
merged 1 commit into from
Sep 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 17 additions & 16 deletions python/lsst/atmospec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import sys
import lsst.afw.math as afwMath
import lsst.afw.image as afwImage
from lsst.ctrl.mpexec import SimplePipelineExecutor
from lsst.ctrl.mpexec import SeparablePipelineExecutor
import lsst.afw.geom as afwGeom
import lsst.geom as geom
import lsst.daf.butler as dafButler
from lsst.daf.butler.registry import RegistryDefaults

from astro_metadata_translator import ObservationInfo
import lsst.pex.config as pexConfig
from lsst.pipe.base import Pipeline
Expand Down Expand Up @@ -633,10 +635,13 @@ def makeQuery(dataId):
if extraInputCollections is not None:
extraInputCollections = ensure_iterable(extraInputCollections)
inputs.extend(extraInputCollections)
butler = SimplePipelineExecutor.prep_butler(repo,
inputs=inputs,
output=outputCollection)

butler = dafButler.Butler(repo, writeable=True, collections=inputs)

butler.registry.registerCollection(outputCollection, dafButler.CollectionType.CHAINED)
run = outputCollection + '/run'
butler.registry.defaults = RegistryDefaults(collections=outputCollection, run=run)
butler.registry.setCollectionChain(outputCollection, [run] + inputs)
pipeline = Pipeline.fromFile("${ATMOSPEC_DIR}/pipelines/processStar.yaml")

for taskName, configClass in taskConfigs.items():
Expand All @@ -658,18 +663,14 @@ def makeQuery(dataId):
pipeline.addConfigOverride(taskName, option, value)

query = makeQuery(dataId)
executor = SimplePipelineExecutor.from_pipeline(pipeline,
where=query,
root=repo,
butler=butler)
executor = SeparablePipelineExecutor(butler, clobber_output=True)

quantumGraph = executor.make_quantum_graph(pipeline, where=query)
executor.pre_execute_qgraph(quantumGraph, save_versions=False)

logging.basicConfig(level=logging.INFO, stream=sys.stdout)
quanta = executor.run()

# quanta is just a plain list, but the items know their names, so rather
# than just taking the third item and relying on that being the position in
# the pipeline we get the item by name
processStarQuantum = [q for q in quanta if q.taskName == 'lsst.atmospec.processStar.ProcessStarTask'][0]
dataRef = processStarQuantum.outputs['spectractorSpectrum'][0]
result = butler.get(dataRef)
executor.run_pipeline(quantumGraph, fail_fast=True)

butler.registry.refresh()
result = butler.get('spectractorSpectrum', dataId)
return result