Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-25806: Support parallel ap_verify ingestion in Gen 3 #102

Merged
merged 7 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/lsst.ap.verify/command-line-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Required arguments are :option:`--dataset` and :option:`--output`.
**Number of processes to use.**

When ``processes`` is larger than 1 the pipeline may use the Python `multiprocessing` module to parallelize processing of multiple datasets across multiple processors.
In Gen 3 mode, data ingestion may also be parallelized.

.. option:: --image-metrics-config <filename>

Expand Down
2 changes: 1 addition & 1 deletion doc/lsst.ap.verify/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Using the `HiTS 2015 <https://github.com/lsst/ap_verify_hits2015/>`_ dataset as

ingest_dataset.py --dataset HiTS2015 --gen2 --output workspaces/hits/

The :option:`--dataset`, :option:`--output`, :option:`--gen2`, and :option:`--gen3` arguments behave the same way as for :command:`ap_verify.py`.
The :option:`--dataset`, :option:`--output`, :option:`--gen2`, :option:`--gen3`, and :option:`--processes` arguments behave the same way as for :command:`ap_verify.py`.
Other options from :command:`ap_verify.py` are not available.

.. _ap-verify-results:
Expand Down
26 changes: 20 additions & 6 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ def __init__(self):
help='Handle the ap_verify dataset using the Gen 3 framework.')


class _ProcessingParser(argparse.ArgumentParser):
"""An argument parser for general run-time characteristics.

This parser is not complete, and is designed to be passed to another parser
using the `parent` parameter.
"""

def __init__(self):
# Help and documentation will be handled by main program's parser
argparse.ArgumentParser.__init__(self, add_help=False)
self.add_argument("-j", "--processes", default=1, type=int,
help="Number of processes to use.")


class _ApVerifyParser(argparse.ArgumentParser):
"""An argument parser for data needed by the main ap_verify program.
"""
Expand All @@ -74,7 +88,7 @@ def __init__(self):
self,
description='Executes the LSST DM AP pipeline and analyzes its performance using metrics.',
epilog='',
parents=[_InputOutputParser(), ApPipeParser(), MetricsParser()],
parents=[_InputOutputParser(), _ProcessingParser(), ApPipeParser(), MetricsParser()],
add_help=True)


Expand All @@ -92,7 +106,7 @@ def __init__(self):
'passing the same --output argument, or by other programs that accept '
'Butler repositories as input.',
epilog='',
parents=[_InputOutputParser()],
parents=[_InputOutputParser(), _ProcessingParser()],
add_help=True)


Expand Down Expand Up @@ -164,15 +178,15 @@ def runApVerify(cmdLine=None):

if args.useGen3:
workspace = WorkspaceGen3(args.output)
ingestDatasetGen3(args.dataset, workspace)
ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
log.info('Running pipeline...')
# Gen 3 pipeline includes both AP and metrics
return runApPipeGen3(workspace, args)
return runApPipeGen3(workspace, args, processes=args.processes)
else:
workspace = WorkspaceGen2(args.output)
ingestDataset(args.dataset, workspace)
log.info('Running pipeline...')
apPipeResults = runApPipeGen2(workspace, args)
apPipeResults = runApPipeGen2(workspace, args, processes=args.processes)
computeMetrics(workspace, apPipeResults.parsedCmd.id, args)
return _getCmdLineExitStatus(apPipeResults.resultList)

Expand Down Expand Up @@ -221,7 +235,7 @@ def runIngestion(cmdLine=None):

if args.useGen3:
workspace = WorkspaceGen3(args.output)
ingestDatasetGen3(args.dataset, workspace)
ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
else:
workspace = WorkspaceGen2(args.output)
ingestDataset(args.dataset, workspace)
13 changes: 13 additions & 0 deletions python/lsst/ap/verify/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Dataset:
"""

def __init__(self, datasetId):
self._id = datasetId
# daf.persistence.Policy's behavior on missing keys is apparently undefined
# test for __getattr__ *either* raising KeyError or returning None
try:
Expand Down Expand Up @@ -230,6 +231,18 @@ def _validatePackage(self):
if not _isRepo(self._stubInputRepo):
raise RuntimeError('Stub repo at ' + self._stubInputRepo + 'is missing mapper file')

def __eq__(self, other):
"""Test that two Dataset objects are equal.

Two objects are equal iff they refer to the same ap_verify dataset.
"""
return self.datasetRoot == other.datasetRoot

def __repr__(self):
"""A string representation that can be used to reconstruct the dataset.
"""
return f"Dataset({self._id!r})"

def makeCompatibleRepo(self, repoDir, calibRepoDir):
"""Set up a directory as a Gen 2 repository compatible with this ap_verify dataset.

Expand Down
47 changes: 36 additions & 11 deletions python/lsst/ap/verify/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,20 +463,35 @@ def __init__(self, dataset, workspace, *args, **kwargs):
self.makeSubtask("ingester", butler=self.workspace.workButler)
self.makeSubtask("visitDefiner", butler=self.workspace.workButler)

def run(self):
# Overrides Task.__reduce__
def __reduce__(self):
baseArgs = super().__reduce__()[1]
return (self.__class__, (self.dataset, self.workspace, *baseArgs))

def run(self, processes=1):
"""Ingest the contents of a dataset into a Butler repository.

Parameters
----------
processes : `int`
The number processes to use to ingest.
"""
self._ensureRaws()
self._defineVisits()
self._ensureRaws(processes=processes)
self._defineVisits(processes=processes)
self._copyConfigs()

def _ensureRaws(self):
def _ensureRaws(self, processes):
"""Ensure that the repository in ``workspace`` has raws ingested.

After this method returns, this task's repository contains all science
data from this task's ap_verify dataset. Butler operations on the
repository are not able to modify ``dataset`` in any way.

Parameters
----------
processes : `int`
The number processes to use to ingest, if ingestion must be run.

Raises
------
RuntimeError
Expand All @@ -493,12 +508,12 @@ def _ensureRaws(self):
dataFiles = _findMatchingFiles(self.dataset.rawLocation, self.config.dataFiles,
exclude=self.config.dataBadFiles)
if dataFiles:
self._ingestRaws(dataFiles)
self._ingestRaws(dataFiles, processes=processes)
self.log.info("Images are now ingested in {0}".format(self.workspace.repo))
else:
raise RuntimeError("No raw files found at %s." % self.dataset.rawLocation)

def _ingestRaws(self, dataFiles):
def _ingestRaws(self, dataFiles, processes):
"""Ingest raw images into a repository.

This task's repository is populated with *links* to ``dataFiles``.
Expand All @@ -507,6 +522,8 @@ def _ingestRaws(self, dataFiles):
----------
dataFiles : `list` of `str`
A list of filenames to ingest. May contain wildcards.
processes : `int`
The number processes to use to ingest.

Raises
------
Expand All @@ -517,15 +534,21 @@ def _ingestRaws(self, dataFiles):
raise RuntimeError("No raw files to ingest (expected list of filenames, got %r)." % dataFiles)

try:
self.ingester.run(dataFiles, run=None) # expect ingester to name a new collection
# run=None because expect ingester to name a new collection
self.ingester.run(dataFiles, run=None, processes=processes)
except lsst.daf.butler.registry.ConflictingDefinitionError as detail:
raise RuntimeError("Not all raw files are unique") from detail

def _defineVisits(self):
def _defineVisits(self, processes):
"""Map visits to the ingested exposures.

This step is necessary to be able to run most pipelines on raw datasets.

Parameters
----------
processes : `int`
The number processes to use to define visits.

Raises
------
RuntimeError
Expand All @@ -541,7 +564,7 @@ def _defineVisits(self):
exposuresNoVisits = exposures - exposuresWithVisits
if exposuresNoVisits:
self.log.info("Defining visits...")
self.visitDefiner.run(exposuresNoVisits)
self.visitDefiner.run(exposuresNoVisits, processes=processes)
else:
self.log.info("Visits were previously defined, skipping...")

Expand Down Expand Up @@ -584,7 +607,7 @@ def ingestDataset(dataset, workspace):
log.info("Data ingested")


def ingestDatasetGen3(dataset, workspace):
def ingestDatasetGen3(dataset, workspace, processes=1):
"""Ingest the contents of an ap_verify dataset into a Gen 3 Butler repository.

The original data directory is not modified.
Expand All @@ -596,11 +619,13 @@ def ingestDatasetGen3(dataset, workspace):
workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
The abstract location where the epository is be created, if it does
not already exist.
processes : `int`
The number processes to use to ingest.
"""
log = lsst.log.Log.getLogger("ap.verify.ingestion.ingestDataset")

ingester = Gen3DatasetIngestTask(dataset, workspace, config=_getConfig(Gen3DatasetIngestTask, dataset))
ingester.run()
ingester.run(processes=processes)
log.info("Data ingested")


Expand Down
14 changes: 8 additions & 6 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ def __init__(self):
help='An identifier for the data to process.')
self.add_argument("-p", "--pipeline", default=defaultPipeline,
help="A custom version of the ap_verify pipeline (e.g., with different metrics).")
self.add_argument("-j", "--processes", default=1, type=int,
help="Number of processes to use.")
self.add_argument("--skip-pipeline", action="store_true",
help="Do not run the AP pipeline itself. This argument is useful "
"for testing metrics on a fixed data set.")
Expand All @@ -80,7 +78,7 @@ def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, [values])


def runApPipeGen2(workspace, parsedCmdLine):
def runApPipeGen2(workspace, parsedCmdLine, processes=1):
"""Run `ap_pipe` on this object's dataset.

Parameters
Expand All @@ -89,6 +87,8 @@ def runApPipeGen2(workspace, parsedCmdLine):
The abstract location containing input and output repositories.
parsedCmdLine : `argparse.Namespace`
Command-line arguments, including all arguments supported by `ApPipeParser`.
processes : `int`
The number of processes with which to call the AP pipeline

Returns
-------
Expand All @@ -112,7 +112,7 @@ def runApPipeGen2(workspace, parsedCmdLine):
pipelineArgs.extend(["--id", *singleId.split(" ")])
else:
pipelineArgs.extend(["--id"])
pipelineArgs.extend(["--processes", str(parsedCmdLine.processes)])
pipelineArgs.extend(["--processes", str(processes)])
pipelineArgs.extend(["--noExit"])

if not parsedCmdLine.skip_pipeline:
Expand All @@ -132,7 +132,7 @@ def runApPipeGen2(workspace, parsedCmdLine):
return results


def runApPipeGen3(workspace, parsedCmdLine):
def runApPipeGen3(workspace, parsedCmdLine, processes=1):
"""Run `ap_pipe` on this object's dataset.

Parameters
Expand All @@ -141,6 +141,8 @@ def runApPipeGen3(workspace, parsedCmdLine):
The abstract location containing input and output repositories.
parsedCmdLine : `argparse.Namespace`
Command-line arguments, including all arguments supported by `ApPipeParser`.
processes : `int`
The number of processes with which to call the AP pipeline
"""
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipeGen3')

Expand All @@ -159,7 +161,7 @@ def runApPipeGen3(workspace, parsedCmdLine):
if parsedCmdLine.dataIds:
for singleId in parsedCmdLine.dataIds:
pipelineArgs.extend(["--data-query", singleId])
pipelineArgs.extend(["--processes", str(parsedCmdLine.processes)])
pipelineArgs.extend(["--processes", str(processes)])
pipelineArgs.extend(["--register-dataset-types"])

if not parsedCmdLine.skip_pipeline:
Expand Down
11 changes: 11 additions & 0 deletions python/lsst/ap/verify/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ def mkdir(directory):
mode = stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH # a+r, u+rwx
pathlib.Path(directory).mkdir(parents=True, exist_ok=True, mode=mode)

def __eq__(self, other):
"""Test whether two workspaces are of the same type and have the
same location.
"""
return type(self) == type(other) and self.workDir == other.workDir

def __repr__(self):
"""A string representation that can be used to reconstruct the Workspace.
"""
return f"{type(self).__name__}({self.workDir!r})"

@property
def workDir(self):
"""The absolute location of the workspace as a whole
Expand Down
4 changes: 4 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def setUpClass(cls):
def setUp(self):
self._testbed = Dataset(DatasetTestSuite.datasetKey)

def testRepr(self):
# Required to match constructor call
self.assertEqual(repr(self._testbed), "Dataset(" + repr(self.datasetKey) + ")")

def testDatasets(self):
"""Verify that a Dataset knows its supported datasets.
"""
Expand Down