Skip to content

Commit

Permalink
Merge pull request #110 from lsst/tickets/DM-27153
Browse files Browse the repository at this point in the history
DM-27153: adapt to daf_butler changes to defaulting of collections and data IDs
  • Loading branch information
TallJimbo committed Jan 12, 2021
2 parents 834ab33 + a0f2fe8 commit 7f4067b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 22 deletions.
38 changes: 20 additions & 18 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import logging
import re
import sys
from typing import List, Optional, Tuple
from typing import Optional, Tuple
import warnings

# -----------------------------
Expand All @@ -42,10 +42,9 @@
Butler,
CollectionSearch,
CollectionType,
DatasetTypeRestriction,
Registry,
)
from lsst.daf.butler.registry import MissingCollectionError
from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults
import lsst.pex.config as pexConfig
from lsst.pipe.base import GraphBuilder, Pipeline, QuantumGraph
from lsst.obs.base import Instrument
Expand Down Expand Up @@ -78,10 +77,10 @@ class _OutputChainedCollectionInfo:
def __init__(self, registry: Registry, name: str):
self.name = name
try:
self.chain = list(registry.getCollectionChain(name))
self.chain = tuple(registry.getCollectionChain(name))
self.exists = True
except MissingCollectionError:
self.chain = []
self.chain = ()
self.exists = False

def __str__(self):
Expand All @@ -95,10 +94,10 @@ def __str__(self):
"""Whether this collection already exists in the registry (`bool`).
"""

chain: List[str]
"""The definition of the collection, if it already exists (`list`).
chain: Tuple[str, ...]
"""The definition of the collection, if it already exists (`tuple` [`str`]).
Empty if the collection does not alredy exist.
Empty if the collection does not already exist.
"""


Expand Down Expand Up @@ -200,7 +199,7 @@ def __init__(self, registry: Registry, args: argparse.Namespace, writeable: bool
self.outputRun = None
else:
raise ValueError("Cannot write without at least one of (--output, --output-run).")
self.inputs = list(CollectionSearch.fromExpression(args.input)) if args.input else []
self.inputs = tuple(CollectionSearch.fromExpression(args.input)) if args.input else ()

def check(self, args: argparse.Namespace):
"""Check command-line options for consistency with each other and the
Expand Down Expand Up @@ -355,16 +354,20 @@ def makeWriteButler(cls, args: argparse.Namespace) -> Butler:
raise NotImplementedError(
f"Unsupported --prune-replaced option '{args.prune_replaced}'."
)
chainDefinition.insert(0, self.outputRun.name)
chainDefinition = CollectionSearch.fromExpression(chainDefinition)
if not self.output.exists:
butler.registry.registerCollection(self.output.name, CollectionType.CHAINED)
if not args.extend_run:
butler.registry.registerCollection(self.outputRun.name, CollectionType.RUN)
chainDefinition.insert(0, self.outputRun.name)
butler.registry.setCollectionChain(self.output.name, chainDefinition)
_LOG.debug("Preparing butler to write to '%s' and read from '%s'=%s",
self.outputRun.name, self.output.name, chainDefinition)
return Butler(butler=butler, run=self.outputRun.name, collections=self.output.name,
chains={self.output.name: chainDefinition})
butler.registry.defaults = RegistryDefaults(run=self.outputRun.name, collections=self.output.name)
else:
inputs = CollectionSearch.fromExpression([self.outputRun.name] + self.inputs)
inputs = CollectionSearch.fromExpression((self.outputRun.name,) + self.inputs)
_LOG.debug("Preparing butler to write to '%s' and read from %s.", self.outputRun.name, inputs)
return Butler(butler=butler, run=self.outputRun.name, collections=inputs)
butler.registry.defaults = RegistryDefaults(run=self.outputRun.name, collections=inputs)
return butler

output: Optional[_OutputChainedCollectionInfo]
"""Information about the output chained collection, if there is or will be
Expand All @@ -376,9 +379,8 @@ def makeWriteButler(cls, args: argparse.Namespace) -> Butler:
one (`_OutputRunCollectionInfo` or `None`).
"""

inputs: List[Tuple[str, DatasetTypeRestriction]]
"""Input collections, including those also used for outputs and any
restrictions on dataset types (`list`).
inputs: Tuple[str, ...]
"""Input collections provided directly by the user (`tuple` [ `str` ]).
"""


Expand Down
8 changes: 4 additions & 4 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def testSimpleQGraphReplaceRun(self):
self.assertEqual(taskFactory.countExec, nQuanta)

# need to refresh collections explicitly (or make new butler/registry)
butler.registry._collections.refresh()
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1"})

Expand All @@ -539,7 +539,7 @@ def testSimpleQGraphReplaceRun(self):
args.output_run = "output/run2"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)

butler.registry._collections.refresh()
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2"})

Expand All @@ -558,7 +558,7 @@ def testSimpleQGraphReplaceRun(self):
args.output_run = "output/run3"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)

butler.registry._collections.refresh()
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run3"})

Expand All @@ -581,7 +581,7 @@ def testSimpleQGraphReplaceRun(self):
args.output_run = "output/run4"
fwk.runPipeline(copy.deepcopy(qgraph), taskFactory, args)

butler.registry._collections.refresh()
butler.registry.refresh()
collections = set(butler.registry.queryCollections(...))
# output/run3 should disappear now
self.assertEqual(collections, {"test", "output", "output/run1", "output/run2", "output/run4"})
Expand Down

0 comments on commit 7f4067b

Please sign in to comment.