Skip to content

Commit

Permalink
Merge pull request #13 from lsst/tickets/DM-31251
Browse files Browse the repository at this point in the history
DM-31251: Add execution butler tests
  • Loading branch information
timj committed Aug 2, 2021
2 parents e924637 + d6eaa40 commit 214f780
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ staging/junk.yaml
.*dblite
single_ccd_graph.pkl
_build.*
*.qgraph
__pycache__
*execution_butler
73 changes: 68 additions & 5 deletions bin/run_demo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,33 +43,96 @@ if [ ! -d DATA_REPO/HSC/calib ]; then
fi

# ingestRaws.py doesn't search recursively; over-specifying to work around that.
if [ -z "$(find -L DATA_REPO/HSC/raw -type f)" ]; then
butler ingest-raws DATA_REPO input_data/HSC/raw/all/raw/r/HSC-R/
if [ -z "$(butler query-datasets DATA_REPO/ raw | grep HSC)" ]; then
butler ingest-raws DATA_REPO input_data/HSC/raw/all/raw/r/HSC-R/ --transfer direct
butler define-visits DATA_REPO HSC --collections HSC/raw/all
fi

incoll="HSC/calib,HSC/raw/all,refcats"

# Pipeline execution will fail on second attempt because the output run
# can not be the same.
# Do not specify a number of processors (-j) to test that the default value
# works.
# The output collection name must match that used in the Python tests.
pipetask --long-log run -d "exposure=903342 AND detector=10" -b DATA_REPO/butler.yaml \
--input HSC/calib,HSC/raw/all,refcats \
--input "$incoll" \
--register-dataset-types -p "${PIPE_TASKS_DIR}/pipelines/DRP.yaml#processCcd" \
--instrument lsst.obs.subaru.HyperSuprimeCam --output-run demo_collection

# Do not provide a data query (-d) to verify code correctly handles an empty
# query.
pipetask qgraph -b DATA_REPO/butler.yaml \
--input HSC/calib,HSC/raw/all,refcats \
--input "$incoll" \
-p "${PIPE_TASKS_DIR}/pipelines/DRP.yaml#processCcd" \
--instrument lsst.obs.subaru.HyperSuprimeCam --output-run demo_collection_1

# Do a new shorter run using replace-run
pipetask run -d "exposure=903342 AND detector=10" -b DATA_REPO/butler.yaml \
--input HSC/calib,HSC/raw/all,refcats \
--input "$incoll" \
--register-dataset-types -p "${PIPE_TASKS_DIR}/pipelines/DRP.yaml#isr" \
--instrument lsst.obs.subaru.HyperSuprimeCam --output demo_collection2

pipetask run -d "exposure=903342 AND detector=10" -b DATA_REPO/butler.yaml \
--register-dataset-types -p "${PIPE_TASKS_DIR}/pipelines/DRP.yaml#isr" \
--instrument lsst.obs.subaru.HyperSuprimeCam --output demo_collection2 --replace-run

# Test the execution butler.
# Create execution butler with new graph and output collection.

graph_file="test_exe.qgraph"
exedir="./execution_butler"
# This collection name must match that used in the Python tests.
exeoutput="demo_collection_exe"
exerun="$exeoutput/YYYYMMDD"

pipetask qgraph -b DATA_REPO/butler.yaml \
--input "$incoll" \
-p "${PIPE_TASKS_DIR}/pipelines/DRP.yaml#processCcd" \
-q "$graph_file" \
--save-execution-butler "$exedir" \
--clobber-execution-butler \
--instrument lsst.obs.subaru.HyperSuprimeCam --output "$exeoutput" --output-run "$exerun"

# Run the execution butler in multiple steps, ensuring that a fresh
# butler is used each time.

tmp_butler="./tmp_execution_butler"
refresh_exe() {
rm -rf "$tmp_butler"
mkdir "$tmp_butler"
cp "$exedir"/* "$tmp_butler/"
}

refresh_exe

# Run the init step
pipetask --long-log run -b "$tmp_butler" -i "$incoll" --output-run "$exerun" --init-only --register-dataset-types --qgraph "$graph_file" --extend-run

# Run the three quanta one at a time to ensure that we can start from a
# clean execution butler every time.
refresh_exe
node=0
pipetask --long-log run -b "$tmp_butler" --output-run "$exerun" --qgraph "$graph_file" --qgraph-node-id "$node" --skip-init-writes --extend-run --clobber-outputs --skip-existing

refresh_exe
node=1
pipetask --long-log run -b "$tmp_butler" --output-run "$exerun" --qgraph "$graph_file" --qgraph-node-id $node --skip-init-writes --extend-run --clobber-outputs --skip-existing

refresh_exe
node=2
pipetask --long-log run -b "$tmp_butler" --output-run "$exerun" --qgraph "$graph_file" --qgraph-node-id $node --skip-init-writes --extend-run --clobber-outputs --skip-existing

# Bring home the datasets.
butler --log-level=VERBOSE --long-log transfer-datasets "$exedir" DATA_REPO --collections "$exerun"

# Create the collection in the main repo.
# Do this by first appending the input collections and then prepending
# the output run collection. This way we do not need to check for existence
# of a previous chain.
# If --replace-run is required an extra line to do --mode=pop should be added.
butler collection-chain DATA_REPO --mode=extend "$exeoutput" ${incoll//,/ }
butler collection-chain DATA_REPO --mode=prepend "$exeoutput" "$exerun"

# Run some tests on the final butler state.
pytest tests/
97 changes: 97 additions & 0 deletions tests/test_butler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# This file is part of pipelines_check.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Output butler from pipelines_check test run."""

import os
import unittest

from lsst.daf.butler import Butler

TESTDIR = os.path.abspath(os.path.dirname(__file__))

# These collection names must match those used in the run_demo.sh
# script.
MAIN_CHAIN = "demo_collection"
EXE_CHAIN = "demo_collection_exe"


class PiplinesCheckTestCase(unittest.TestCase):
"""Check outputs from test run."""

def setUp(self):
"""Create a new butler root for each test."""
root = os.path.join(TESTDIR, os.path.pardir, "DATA_REPO")
self.butler = Butler(root, writeable=False)

def _get_datasets_from_chain(self, chain, datasetType=...):
"""Return all the datasets from the first run in chain.
Datasets are all unresolved.
"""
collections = list(self.butler.registry.queryCollections(chain, flattenChains=True))
run = collections.pop(0)
print(f"Retrieving datasets from run {run}")

unresolved = {ref.unresolved() for ref in self.butler.registry.queryDatasets(datasetType=datasetType,
collections=run)}
return unresolved

def testExecutionButler(self):
"""Check outputs match in both runs."""

# Check that we have identical datasets in both collections
# except for the dataset.id
main_datasets = self._get_datasets_from_chain(MAIN_CHAIN)
exe_datasets = self._get_datasets_from_chain(EXE_CHAIN)
self.assertGreater(len(exe_datasets), 0)
self.assertEqual(len(main_datasets), len(exe_datasets))

difference = main_datasets - exe_datasets
self.assertEqual(len(difference), 0, "Some datasets missing.")

def testExecutionExistence(self):
"""Check that the execution butler files are really there."""

exe_datasets = self._get_datasets_from_chain(EXE_CHAIN)
for ref in exe_datasets:
self.assertTrue(self.butler.datasetExists(ref, collections=EXE_CHAIN))

def testLogDataset(self):
"""Ensure that the logs are captured in both modes."""

log_datasets = self._get_datasets_from_chain(MAIN_CHAIN, datasetType="isr_log")
self.assertEqual(len(log_datasets), 1)

isr_log_ref = log_datasets.pop()

# Get the logs from both main and exe collections.
main_isr_log = self.butler.get("isr_log", dataId=isr_log_ref.dataId, collections=MAIN_CHAIN)
exe_isr_log = self.butler.get("isr_log", dataId=isr_log_ref.dataId, collections=EXE_CHAIN)

# Timestamps and durations will differ but check to see that the
# number of log messages matched.
self.assertEqual(len(main_isr_log), len(exe_isr_log),
f"Standard log: {main_isr_log}\n vs\nExecution butler log: {exe_isr_log}")


if __name__ == "__main__":
unittest.main()

0 comments on commit 214f780

Please sign in to comment.