Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26375: Allow databases other than SQLite when running ap_verify #113

Merged
merged 3 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions doc/lsst.ap.verify/command-line-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ Required arguments are :option:`--dataset` and :option:`--output`.
For the Gen 3 equivalent to this option, see :option:`--pipeline`.
See also :doc:`new-metrics`.

.. option:: --db, --db_url

**Target Alert Production Database**

A URI string identifying the database in which to store source associations.
The string must be in the format expected by `lsst.dax.apdb.ApdbConfig.db_url`, i.e. an SQLAlchemy connection string.
The indicated database is created if it does not exist and this is appropriate for the database type.

If this argument is omitted, ``ap_verify`` creates an SQLite database inside the directory indicated by :option:`--output`.

.. option:: --gen2
.. option:: --gen3

Expand Down
44 changes: 28 additions & 16 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def __init__(self):
help='An identifier for the data to process.')
self.add_argument("-p", "--pipeline", default=defaultPipeline,
help="A custom version of the ap_verify pipeline (e.g., with different metrics).")
self.add_argument("--db", "--db_url", default=None,
help="A location for the AP database, formatted as if for ApdbConfig.db_url. "
"Defaults to an SQLite file in the --output directory.")
self.add_argument("--skip-pipeline", action="store_true",
help="Do not run the AP pipeline itself. This argument is useful "
"for testing metrics on a fixed data set.")
Expand Down Expand Up @@ -103,13 +106,13 @@ def runApPipeGen2(workspace, parsedCmdLine, processes=1):
"""
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipeGen2')

makeApdb(_getApdbArguments(workspace))
makeApdb(_getApdbArguments(workspace, parsedCmdLine))

pipelineArgs = [workspace.dataRepo,
"--output", workspace.outputRepo,
"--calib", workspace.calibRepo,
"--template", workspace.templateRepo]
pipelineArgs.extend(_getConfigArguments(workspace))
pipelineArgs.extend(_getConfigArguments(workspace, parsedCmdLine))
if parsedCmdLine.dataIds:
for singleId in parsedCmdLine.dataIds:
pipelineArgs.extend(["--id", *singleId.split(" ")])
Expand Down Expand Up @@ -149,8 +152,7 @@ def runApPipeGen3(workspace, parsedCmdLine, processes=1):
"""
log = lsst.log.Log.getLogger('ap.verify.pipeline_driver.runApPipeGen3')

# Currently makeApdb has different argument conventions from Gen 3; see DM-22663
makeApdb(_getApdbArguments(workspace))
makeApdb(_getApdbArguments(workspace, parsedCmdLine))

pipelineArgs = ["run",
"--butler-config", workspace.repo,
Expand All @@ -160,7 +162,7 @@ def runApPipeGen3(workspace, parsedCmdLine, processes=1):
# but I can't find a way to hook that up to the graph builder. So use the CLI
# for now and revisit once DM-26239 is done.
pipelineArgs.extend(_getCollectionArguments(workspace))
pipelineArgs.extend(_getConfigArgumentsGen3(workspace))
pipelineArgs.extend(_getConfigArgumentsGen3(workspace, parsedCmdLine))
if parsedCmdLine.dataIds:
for singleId in parsedCmdLine.dataIds:
pipelineArgs.extend(["--data-query", singleId])
Expand All @@ -182,30 +184,36 @@ def runApPipeGen3(workspace, parsedCmdLine, processes=1):
log.info('Skipping AP pipeline entirely.')


def _getApdbArguments(workspace):
def _getApdbArguments(workspace, parsed):
"""Return the config options for running make_apdb.py on this workspace,
as command-line arguments.

Parameters
----------
workspace : `lsst.ap.verify.workspace.WorkspaceGen2`
workspace : `lsst.ap.verify.workspace.Workspace`
A Workspace whose config directory may contain an
`~lsst.ap.pipe.ApPipeTask` config.
parsed : `argparse.Namespace`
Command-line arguments, including all arguments supported by `ApPipeParser`.

Returns
-------
args : `list` of `str`
Command-line arguments calling ``--config`` or ``--config-file``,
following the conventions of `sys.argv`.
"""
# ApVerify will use the sqlite hooks for the Apdb.
return [
"--config", "db_url=sqlite:///" + workspace.dbLocation,
"--config", "isolation_level=READ_UNCOMMITTED",
]
if not parsed.db:
parsed.db = "sqlite:///" + workspace.dbLocation

args = ["--config", "db_url=" + parsed.db]
# Same special-case check as ApdbConfig.validate()
if parsed.db.startswith("sqlite"):
args.extend(["--config", "isolation_level=READ_UNCOMMITTED"])

return args

def _getConfigArguments(workspace):

def _getConfigArguments(workspace, parsed):
"""Return the config options for running ApPipeTask on this workspace, as
command-line arguments.

Expand All @@ -214,6 +222,8 @@ def _getConfigArguments(workspace):
workspace : `lsst.ap.verify.workspace.WorkspaceGen2`
A Workspace whose config directory may contain an
`~lsst.ap.pipe.ApPipeTask` config.
parsed : `argparse.Namespace`
Command-line arguments, including all arguments supported by `ApPipeParser`.

Returns
-------
Expand All @@ -227,22 +237,24 @@ def _getConfigArguments(workspace):
args = ["--configfile", overridePath]
# Translate APDB-only arguments to work as a sub-config
args.extend([("diaPipe.apdb." + arg if arg != "--config" else arg)
for arg in _getApdbArguments(workspace)])
for arg in _getApdbArguments(workspace, parsed)])
# Put output alerts into the workspace.
args.extend(["--config", "diaPipe.alertPackager.alertWriteLocation=" + workspace.alertLocation])
args.extend(["--config", "diaPipe.doPackageAlerts=True"])

return args


def _getConfigArgumentsGen3(workspace):
def _getConfigArgumentsGen3(workspace, parsed):
"""Return the config options for running the Gen 3 AP Pipeline on this
workspace, as command-line arguments.

Parameters
----------
workspace : `lsst.ap.verify.workspace.WorkspaceGen3`
A Workspace whose config directory may contain various configs.
parsed : `argparse.Namespace`
Command-line arguments, including all arguments supported by `ApPipeParser`.

Returns
-------
Expand All @@ -252,7 +264,7 @@ def _getConfigArgumentsGen3(workspace):
"""
# Translate APDB-only arguments to work as a sub-config
args = [("diaPipe:apdb." + arg if arg != "--config" else arg)
for arg in _getApdbArguments(workspace)]
for arg in _getApdbArguments(workspace, parsed)]
args.extend([
# Put output alerts into the workspace.
"--config", "diaPipe:alertPackager.alertWriteLocation=" + workspace.alertLocation,
Expand Down
33 changes: 33 additions & 0 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,22 @@ def testrunApPipeGen2WorkspaceDb(self, mockDb, mockClass):
cmdLineArgs = self._getCmdLineArgs(mockParse.call_args)
self.assertIn("diaPipe.apdb.db_url=sqlite:///" + self.workspace.dbLocation, cmdLineArgs)

@patchApPipe
def testrunApPipeGen2WorkspaceDbCustom(self, mockDb, mockClass):
"""Test that runApPipeGen2 places a database in the specified location.
"""
self.apPipeArgs.db = "postgresql://somebody@pgdb.misc.org/custom_db"
mockParse = mockClass.parseAndRun
pipeline_driver.runApPipeGen2(self.workspace, self.apPipeArgs)

mockDb.assert_called_once()
cmdLineArgs = self._getCmdLineArgs(mockDb.call_args)
self.assertIn("db_url=" + self.apPipeArgs.db, cmdLineArgs)

mockParse.assert_called_once()
cmdLineArgs = self._getCmdLineArgs(mockParse.call_args)
self.assertIn("diaPipe.apdb.db_url=" + self.apPipeArgs.db, cmdLineArgs)

@patchApPipe
def testrunApPipeGen2Reuse(self, _mockDb, mockClass):
"""Test that runApPipeGen2 does not run the pipeline at all (not even with
Expand Down Expand Up @@ -270,6 +286,23 @@ def testrunApPipeGen3WorkspaceDb(self, mockDb, mockFwk):
cmdLineArgs = self._getCmdLineArgs(mockParse.call_args)
self.assertIn("diaPipe:apdb.db_url=sqlite:///" + self.workspace.dbLocation, cmdLineArgs)

@unittest.skip("Fix test in DM-27117")
@patchApPipeGen3
def testrunApPipeGen3WorkspaceCustom(self, mockDb, mockFwk):
"""Test that runApPipeGen3 places a database in the specified location.
"""
self.apPipeArgs.db = "postgresql://somebody@pgdb.misc.org/custom_db"
pipeline_driver.runApPipeGen3(self.workspace, self.apPipeArgs)

mockDb.assert_called_once()
cmdLineArgs = self._getCmdLineArgs(mockDb.call_args)
self.assertIn("db_url=" + self.apPipeArgs.db, cmdLineArgs)

mockParse = mockFwk().parseAndRun
mockParse.assert_called_once()
cmdLineArgs = self._getCmdLineArgs(mockParse.call_args)
self.assertIn("diaPipe:apdb.db_url=" + self.apPipeArgs.db, cmdLineArgs)

@unittest.skip("Fix test in DM-27117")
@patchApPipeGen3
def testrunApPipeGen3Reuse(self, _mockDb, mockFwk):
Expand Down