Skip to content

Commit

Permalink
Change tests to use the new pipetask API.
Browse files Browse the repository at this point in the history
  • Loading branch information
erykoff committed Oct 12, 2021
1 parent 9c3108e commit 7df4030
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 90 deletions.
191 changes: 101 additions & 90 deletions tests/fgcmcalTestBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
import glob
import esutil

import click.testing
import lsst.ctrl.mpexec.cli.pipetask

import lsst.daf.butler as dafButler
import lsst.obs.base as obsBase
import lsst.geom as geom
import lsst.log
from lsst.pipe.base import Pipeline
from lsst.ctrl.mpexec import SimplePipelineExecutor

import lsst.fgcmcal as fgcmcal

Expand All @@ -65,8 +63,6 @@ def _importRepository(cls, instrument, exportPath, exportFile):
"""
cls.repo = os.path.join(cls.testDir, 'testrepo')

print('Importing %s into %s' % (exportFile, cls.testDir))

# Make the repo and retrieve a writeable Butler
_ = dafButler.Butler.makeRepo(cls.repo)
butler = dafButler.Butler(cls.repo, writeable=True)
Expand All @@ -78,9 +74,9 @@ def _importRepository(cls, instrument, exportPath, exportFile):
transfer='symlink',
skip_dimensions={'instrument', 'detector', 'physical_filter'})

def _runPipeline(self, repo, pipelineFile, queryString=None,
def _runPipeline(self, repo, pipelineFile, queryString='',
inputCollections=None, outputCollection=None,
configFiles=None, configOptions=None,
configFiles={}, configOptions={},
registerDatasetTypes=False):
"""Run a pipeline via pipetask.
Expand All @@ -91,17 +87,25 @@ def _runPipeline(self, repo, pipelineFile, queryString=None,
pipelineFile : `str`
Pipeline definition file.
queryString : `str`, optional
String to use for "-d" data query.
inputCollections : `str`, optional
String to use for "-i" input collections (comma delimited).
Where query that defines the data to use.
inputCollections : `list` [`str`], optional
Input collections list.
outputCollection : `str`, optional
String to use for "-o" output collection.
configFiles : `list` [`str`], optional
List of config files to use (with "-C").
Output collection name.
configFiles : `dict` [`list`], optional
Dictionary of config files. The key of the ``configFiles``
dict is the relevant task label. The value of ``configFiles``
is a list of config files to apply (in order) to that task.
configOptions : `dict` [`dict`], optional
Dictionary of individual config options. The key of the
``configOptions`` dict is the relevant task label. The value
of ``configOptions`` is another dict that contains config
key/value overrides to apply.
configOptions : `list` [`str`], optional
List of individual config options to use (with "-c").
List of individual config options to use. Each string will
be of the form ``taskName:configField=value``.
registerDatasetTypes : `bool`, optional
Set "--register-dataset-types".
Register new dataset types?
Returns
-------
Expand All @@ -112,31 +116,25 @@ def _runPipeline(self, repo, pipelineFile, queryString=None,
------
RuntimeError : Raised if the "pipetask" call fails.
"""
pipelineArgs = ["run",
"-b", repo,
"-p", pipelineFile]

if queryString is not None:
pipelineArgs.extend(["-d", queryString])
if inputCollections is not None:
pipelineArgs.extend(["-i", inputCollections])
if outputCollection is not None:
pipelineArgs.extend(["-o", outputCollection])
if configFiles is not None:
for configFile in configFiles:
pipelineArgs.extend(["-C", configFile])
if configOptions is not None:
for configOption in configOptions:
pipelineArgs.extend(["-c", configOption])
if registerDatasetTypes:
pipelineArgs.extend(["--register-dataset-types"])

# CliRunner is an unsafe workaround for DM-26239
runner = click.testing.CliRunner()
results = runner.invoke(lsst.ctrl.mpexec.cli.pipetask.cli, pipelineArgs)
if results.exception:
raise RuntimeError("Pipeline %s failed." % (pipelineFile)) from results.exception
return results.exit_code
butler = SimplePipelineExecutor.prep_butler(repo,
inputs=inputCollections,
output=outputCollection)

pipeline = Pipeline.fromFile(pipelineFile)
for taskName, fileList in configFiles.items():
for fileName in fileList:
pipeline.addConfigFile(taskName, fileName)
for taskName, configDict in configOptions.items():
for option, value in configDict.items():
pipeline.addConfigOverride(taskName, option, value)

executor = SimplePipelineExecutor.from_pipeline(pipeline,
where=queryString,
root=repo,
butler=butler)
quanta = executor.run(register_dataset_types=registerDatasetTypes)

return len(quanta)

def _testFgcmMakeLut(self, instName, testName, nBand, i0Std, i0Recon, i10Std, i10Recon):
"""Test running of FgcmMakeLutTask
Expand All @@ -160,17 +158,17 @@ def _testFgcmMakeLut(self, instName, testName, nBand, i0Std, i0Recon, i10Std, i1
"""
instCamel = instName.title()

configFile = 'fgcmMakeLut:' + os.path.join(ROOT,
'config',
'fgcmMakeLut%s.py' % (instCamel))
configFiles = {'fgcmMakeLut': [os.path.join(ROOT,
'config',
f'fgcmMakeLut{instCamel}.py')]}
outputCollection = f'{instName}/{testName}/lut'

self._runPipeline(self.repo,
os.path.join(ROOT,
'pipelines',
'fgcmMakeLut%s.yaml' % (instCamel)),
configFiles=[configFile],
inputCollections='%s/calib,%s/testdata' % (instName, instName),
f'fgcmMakeLut{instCamel}.yaml'),
configFiles=configFiles,
inputCollections=[f'{instName}/calib', f'{instName}/testdata'],
outputCollection=outputCollection,
registerDatasetTypes=True)

Expand Down Expand Up @@ -231,19 +229,21 @@ def _testFgcmBuildStarsTable(self, instName, testName, queryString, visits, nSta
"""
instCamel = instName.title()

configFile = 'fgcmBuildStarsTable:' + os.path.join(ROOT,
'config',
'fgcmBuildStarsTable%s.py' % (instCamel))
configFiles = {'fgcmBuildStarsTable': [os.path.join(ROOT,
'config',
f'fgcmBuildStarsTable{instCamel}.py')]}
outputCollection = f'{instName}/{testName}/buildstars'

self._runPipeline(self.repo,
os.path.join(ROOT,
'pipelines',
'fgcmBuildStarsTable%s.yaml' % (instCamel)),
configFiles=[configFile],
inputCollections=f'{instName}/{testName}/lut,refcats/gen2',
configFiles=configFiles,
inputCollections=[f'{instName}/{testName}/lut',
'refcats/gen2'],
outputCollection=outputCollection,
configOptions=['fgcmBuildStarsTable:ccdDataRefName=detector'],
configOptions={'fgcmBuildStarsTable':
{'ccdDataRefName': 'detector'}},
queryString=queryString,
registerDatasetTypes=True)

Expand Down Expand Up @@ -293,35 +293,41 @@ def _testFgcmFitCycle(self, instName, testName, cycleNumber,
"""
instCamel = instName.title()

configFiles = ['fgcmFitCycle:' + os.path.join(ROOT,
'config',
'fgcmFitCycle%s.py' % (instCamel))]
configFiles = {'fgcmFitCycle': [os.path.join(ROOT,
'config',
f'fgcmFitCycle{instCamel}.py')]}
if extraConfig is not None:
configFiles.append('fgcmFitCycle:' + extraConfig)
configFiles['fgcmFitCycle'].append(extraConfig)

outputCollection = f'{instName}/{testName}/fit'

if cycleNumber == 0:
inputCollections = f'{instName}/{testName}/buildstars'
inputCollections = [f'{instName}/{testName}/buildstars']
else:
# We are reusing the outputCollection so we can't specify the input
inputCollections = None
# In these tests we are running the fit cycle task multiple
# times into the same output collection. This code allows
# us to find the correct chained input collections to use
# so that we can both read from previous runs in the output
# collection and write to a new run in the output collection.
# Note that this behavior is handled automatically by the
# pipetask command-line interface, but not by the python
# API.
butler = dafButler.Butler(self.repo)
inputCollections = list(butler.registry.getCollectionChain(outputCollection))

cwd = os.getcwd()
runDir = os.path.join(self.testDir, testName)
os.makedirs(runDir, exist_ok=True)
os.chdir(runDir)

configOptions = ['fgcmFitCycle:cycleNumber=%d' % (cycleNumber),
'fgcmFitCycle:connections.previousCycleNumber=%d' %
(cycleNumber - 1),
'fgcmFitCycle:connections.cycleNumber=%d' %
(cycleNumber)]

configOptions = {'fgcmFitCycle':
{'cycleNumber': f'{cycleNumber}',
'connections.previousCycleNumber': f'{cycleNumber - 1}',
'connections.cycleNumber': f'{cycleNumber}'}}
self._runPipeline(self.repo,
os.path.join(ROOT,
'pipelines',
'fgcmFitCycle%s.yaml' % (instCamel)),
f'fgcmFitCycle{instCamel}.yaml'),
configFiles=configFiles,
inputCollections=inputCollections,
outputCollection=outputCollection,
Expand Down Expand Up @@ -390,20 +396,21 @@ def _testFgcmOutputProducts(self, instName, testName,
"""
instCamel = instName.title()

configFile = 'fgcmOutputProducts:' + os.path.join(ROOT,
'config',
'fgcmOutputProducts%s.py' % (instCamel))
configFiles = {'fgcmOutputProducts': [os.path.join(ROOT,
'config',
f'fgcmOutputProducts{instCamel}.py')]}
inputCollection = f'{instName}/{testName}/fit'
outputCollection = f'{instName}/{testName}/fit/output'

self._runPipeline(self.repo,
os.path.join(ROOT,
'pipelines',
'fgcmOutputProducts%s.yaml' % (instCamel)),
configFiles=[configFile],
inputCollections=inputCollection,
configFiles=configFiles,
inputCollections=[inputCollection],
outputCollection=outputCollection,
configOptions=['fgcmOutputProducts:doRefcatOutput=False'],
configOptions={'fgcmOutputProducts':
{'doRefcatOutput': 'False'}},
registerDatasetTypes=True)

butler = dafButler.Butler(self.repo)
Expand Down Expand Up @@ -587,15 +594,15 @@ def _testFgcmMultiFit(self, instName, testName, queryString, visits, zpOffsets):
"""
instCamel = instName.title()

configFiles = ['fgcmBuildStarsTable:' + os.path.join(ROOT,
'config',
f'fgcmBuildStarsTable{instCamel}.py'),
'fgcmFitCycle:' + os.path.join(ROOT,
'config',
f'fgcmFitCycle{instCamel}.py'),
'fgcmOutputProducts:' + os.path.join(ROOT,
configFiles = {'fgcmBuildStarsTable': [os.path.join(ROOT,
'config',
f'fgcmOutputProducts{instCamel}.py')]
f'fgcmBuildStarsTable{instCamel}.py')],
'fgcmFitCycle': [os.path.join(ROOT,
'config',
f'fgcmFitCycle{instCamel}.py')],
'fgcmOutputProducts': [os.path.join(ROOT,
'config',
f'fgcmOutputProducts{instCamel}.py')]}
outputCollection = f'{instName}/{testName}/unified'

cwd = os.getcwd()
Expand All @@ -608,9 +615,11 @@ def _testFgcmMultiFit(self, instName, testName, queryString, visits, zpOffsets):
'pipelines',
f'fgcmFullPipeline{instCamel}.yaml'),
configFiles=configFiles,
inputCollections=f'{instName}/{testName}/lut,refcats/gen2',
inputCollections=[f'{instName}/{testName}/lut',
'refcats/gen2'],
outputCollection=outputCollection,
configOptions=['fgcmBuildStarsTable:ccdDataRefName=detector'],
configOptions={'fgcmBuildStarsTable':
{'ccdDataRefName': 'detector'}},
queryString=queryString,
registerDatasetTypes=True)

Expand Down Expand Up @@ -703,15 +712,17 @@ def _testFgcmCalibrateTract(self, instName, testName, visits, tract, skymapName,
"""
instCamel = instName.title()

configFile = os.path.join(ROOT,
'config',
'fgcmCalibrateTractTable%s.py' % (instCamel))
configFiles = {'fgcmCalibrateTractTable':
[os.path.join(ROOT,
'config',
f'fgcmCalibrateTractTable{instCamel}.py')]}

configFiles = ['fgcmCalibrateTractTable:' + configFile]
outputCollection = f'{instName}/{testName}/tract'

inputCollections = f'{instName}/{testName}/lut,refcats/gen2'
configOption = 'fgcmCalibrateTractTable:fgcmOutputProducts.doRefcatOutput=False'
inputCollections = [f'{instName}/{testName}/lut',
'refcats/gen2']
configOptions = {'fgcmCalibrateTractTable':
{'fgcmOutputProducts.doRefcatOutput': 'False'}}

queryString = f"tract={tract:d} and skymap='{skymapName:s}'"

Expand All @@ -723,7 +734,7 @@ def _testFgcmCalibrateTract(self, instName, testName, visits, tract, skymapName,
configFiles=configFiles,
inputCollections=inputCollections,
outputCollection=outputCollection,
configOptions=[configOption],
configOptions=configOptions,
registerDatasetTypes=True)

butler = dafButler.Butler(self.repo)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_fgcmcal_hsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import lsst.utils # noqa: E402
import lsst.pipe.tasks # noqa: E402
import lsst.daf.butler # noqa: E402

import fgcmcalTestBase # noqa: E402

Expand All @@ -53,6 +54,8 @@ def setUpClass(cls):
except LookupError:
raise unittest.SkipTest("obs_subaru not setup")

lsst.daf.butler.cli.cliLog.CliLog.initLog(longlog=False)

cls.testDir = tempfile.mkdtemp(dir=ROOT, prefix="TestFgcm-")

cls._importRepository('lsst.obs.subaru.HyperSuprimeCam',
Expand Down

0 comments on commit 7df4030

Please sign in to comment.