Skip to content

Commit

Permalink
Remove hardcoded database location.
Browse files Browse the repository at this point in the history
Because ApPipeTask can't work with in-memory databases, users must
provide a location themselves. The documentation has been
updated accordingly.
  • Loading branch information
kfindeisen committed Aug 3, 2018
1 parent 03d4769 commit 450df32
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 134 deletions.
15 changes: 10 additions & 5 deletions doc/lsst.ap.pipe/pipeline-tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ To process your ingested data, run

.. prompt:: bash

ap_pipe.py repo --calib repo/calibs --rerun processed --id visit=123456 ccdnum=42 filter=g --template templates
ap_pipe.py repo --calib repo/calibs --rerun processed -c associator.level1_db.db_name=association.db --id visit=123456 ccdnum=42 filter=g --template templates

In this case, a ``processed`` directory will be created within
``repo/rerun`` and the results will be written there.
Expand All @@ -86,12 +86,17 @@ If you prefer to have a standalone output repository, you may instead run

.. prompt:: bash

ap_pipe.py repo --calib repo/calibs --output path/to/put/processed/data/in --id visit=123456 ccdnum=42 filter=g --template path/to/templates
ap_pipe.py repo --calib repo/calibs --output path/to/put/processed/data/in -c associator.level1_db.db_name=association.db --id visit=123456 ccdnum=42 filter=g --template path/to/templates

In this case, the output directory will be created if it does not already exist.
If you omit the ``--template`` flag, ``ap_pipe`` will assume the templates are
somewhere in ``repo``.

.. note::

If you are using the default (SQLite) association database, you must :ref:`configure <command-line-task-config-howto>` the database location, or ``ap_pipe`` will not run.
The location is a path to a new or existing database file to be used for source associations (including associations with previously known objects, if the database already exists).
In the examples above, it is configured with the ``-c`` option, but a personal config file may be more convenient if you intend to run ``ap_pipe`` many times.

.. _section-ap-pipe-expected-outputs:

Expand All @@ -104,11 +109,11 @@ something like

.. code-block:: none
association.db <--- the Prompt Products Database with DIAObjects
repo/
rerun/
processed/
repositoryCfg.yaml
association.db <--- the Prompt Products Database with DIAObjects
deepDiff/
v123456/ <--- difference images and DIASource tables are in here
123456/ <--- all other processed data products are in here (calexps etc.)
Expand Down Expand Up @@ -154,7 +159,7 @@ A full command looks like

.. prompt:: bash

ap_pipe.py repo --calib repo/calibs --rerun processed -C $AP_PIPE_DIR/config/calexpTemplates.py --id visit=123456 ccdnum=42 filter=g --template /path/to/calexp/templates --templateId visit=234567
ap_pipe.py repo --calib repo/calibs --rerun processed -C $AP_PIPE_DIR/config/calexpTemplates.py -c associator.level1_db.db_name=association.db --id visit=123456 ccdnum=42 filter=g --template /path/to/calexp/templates --templateId visit=234567


.. _section-ap-pipe-supplemental-info:
Expand Down Expand Up @@ -224,7 +229,7 @@ calibrated exposures, difference images, inspect DIAObjects and/or DIASources, e
# Open and read all data from the association database
sqliteFile = 'association.db'
connection = sqlite3.connect(os.path.join(workingDir, sqliteFile))
connection = sqlite3.connect(sqliteFile)
tables = {'obj': 'dia_objects', 'src': 'dia_sources', 'con': 'dia_objects_to_dia_sources'}
conTable = pd.read_sql_query('select * from {0};'.format(tables['con']), connection)
objTable = pd.read_sql_query('select * from {0};'.format(tables['obj']), connection)
Expand Down
113 changes: 3 additions & 110 deletions python/lsst/ap/pipe/apPipeTaskRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,125 +23,18 @@

__all__ = ["ApPipeTaskRunner"]

import os
import sys
import traceback

import lsst.log
import lsst.pipe.base as pipeBase


class ApPipeTaskRunner(pipeBase.ButlerInitializedTaskRunner):

def makeTask(self, parsedCmd=None, args=None):
"""Construct an ApPipeTask with both a Butler and a database.
Parameters
----------
parsedCmd : `argparse.Namespace`
Parsed command-line options, as returned by the `~lsst.pipe.base.ArgumentParser`; if specified
then args is ignored.
args
Args tuple passed to `TaskRunner.__call__`. First argument must be
a path to the database file and second argument must be a dataref.
Raises
------
RuntimeError
Raised if ``parsedCmd`` and ``args`` are both `None`.
"""
if parsedCmd is not None:
butler = parsedCmd.butler
dbFile = os.path.join(parsedCmd.output, "association.db")
elif args is not None:
dbFile, dataRef, _ = args
butler = dataRef.butlerSubset.butler
else:
raise RuntimeError("parsedCmd or args must be specified")
return self.TaskClass(config=self.config, log=self.log, butler=butler, dbFile=dbFile)

@staticmethod
def getTargetList(parsedCmd, **kwargs):
"""Get a list of (dbFile, rawRef, kwargs) for `TaskRunner.__call__`.
"""Get a list of (rawRef, kwargs) for `TaskRunner.__call__`.
"""
# Hack to allow makeTask(args). Remove once DM-11767 (or possibly DM-13672) resolved
dbFile = os.path.join(parsedCmd.output, "association.db")
argDict = dict(
return pipeBase.ButlerInitializedTaskRunner.getTargetList(
parsedCmd,
templateIds=parsedCmd.templateId.idList,
reuse=parsedCmd.reuse,
**kwargs
)
return [(dbFile, dataRef, argDict) for dataRef in parsedCmd.id.refList]

# TODO: workaround for DM-11767 or DM-13672; can remove once ApPipeTask.__init__ no longer needs dbFile
# TODO: find a way to pass the DB argument that doesn't require duplicating TaskRunner.__call__
def __call__(self, args):
"""Run the Task on a single target.
Parameters
----------
args
A path to the database file, followed by arguments for Task.run().
Returns
-------
struct : `lsst.pipe.base.Struct`
Contains these fields if ``doReturnResults`` is `True`:
- ``dataRef``: the provided data reference.
- ``metadata``: task metadata after execution of run.
- ``result``: result returned by task run, or `None` if the task fails.
- ``exitStatus`: 0 if the task completed successfully, 1 otherwise.
If ``doReturnResults`` is `False` the struct contains:
- ``exitStatus`: 0 if the task completed successfully, 1 otherwise.
"""
_, dataRef, kwargs = args
if self.log is None:
self.log = lsst.log.Log.getDefaultLogger()
if hasattr(dataRef, "dataId"):
self.log.MDC("LABEL", str(dataRef.dataId))
elif isinstance(dataRef, (list, tuple)):
self.log.MDC("LABEL", str([ref.dataId for ref in dataRef if hasattr(ref, "dataId")]))
task = self.makeTask(args=args)
result = None # in case the task fails
exitStatus = 0 # exit status for the shell
if self.doRaise:
result = task.run(dataRef, **kwargs)
else:
try:
result = task.run(dataRef, **kwargs)
except Exception as e:
# The shell exit value will be the number of dataRefs returning
# non-zero, so the actual value used here is lost.
exitStatus = 1

# don't use a try block as we need to preserve the original exception
eName = type(e).__name__
if hasattr(dataRef, "dataId"):
task.log.fatal("Failed on dataId=%s: %s: %s", dataRef.dataId, eName, e)
elif isinstance(dataRef, (list, tuple)):
task.log.fatal("Failed on dataIds=[%s]: %s: %s",
", ".join(str(ref.dataId) for ref in dataRef), eName, e)
else:
task.log.fatal("Failed on dataRef=%s: %s: %s", dataRef, eName, e)

if not isinstance(e, pipeBase.TaskError):
traceback.print_exc(file=sys.stderr)
task.writeMetadata(dataRef)

# remove MDC so it does not show up outside of task context
self.log.MDCRemove("LABEL")

if self.doReturnResults:
return pipeBase.Struct(
exitStatus=exitStatus,
dataRef=dataRef,
metadata=task.metadata,
result=result,
)
else:
return pipeBase.Struct(
exitStatus=exitStatus,
)
26 changes: 7 additions & 19 deletions python/lsst/ap/pipe/ap_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def validate(self):
raise ValueError("Source association needs difference exposures "
"[differencer.doWriteSubtractedExp].")

# Not all level1_db implementations let you specify a DB location
if hasattr(self.associator.level1_db, "db_name") and self.associator.level1_db.db_name == ":memory:":
raise ValueError("Source association needs a persistent database [associator.level1_db.db_name]. "
"Please provide a DB file (need not exist) or use a different level1_db Task.")


class ApPipeTask(pipeBase.CmdLineTask):
"""Command-line task representing the entire AP pipeline.
Expand All @@ -133,38 +138,21 @@ class ApPipeTask(pipeBase.CmdLineTask):
``config.differencer.getTemplate`` is overridden) template data to
be processed. Its output repository must be both readable
and writable.
dbFile : `str`
The filename where the source association database lives. Will be
created if it does not yet exist.
config : `ApPipeConfig`, optional
A configuration for this task.
"""

ConfigClass = ApPipeConfig
RunnerClass = ApPipeTaskRunner
_DefaultName = "apPipe"

# TODO: dbFile is a workaround for DM-11767
def __init__(self, butler, dbFile, config=None, *args, **kwargs):
# TODO: hacky workaround for DM-13602
modConfig = ApPipeTask._copyConfig(config) if config is not None else ApPipeTask.ConfigClass()
modConfig.associator.level1_db.db_name = dbFile
modConfig.freeze()
pipeBase.CmdLineTask.__init__(self, *args, config=modConfig, **kwargs)
def __init__(self, butler, *args, **kwargs):
pipeBase.CmdLineTask.__init__(self, *args, **kwargs)

self.makeSubtask("ccdProcessor", butler=butler)
self.makeSubtask("differencer", butler=butler)
# Must be called before AssociationTask.__init__
_setupDatabase(self.config.associator.level1_db)
self.makeSubtask("associator")

# TODO: hack for modifying frozen configs; delete once DM-13602 resolved
@staticmethod
def _copyConfig(config):
configClass = type(config)
contents = {key: value for (key, value) in config.items()} # Force non-recursive conversion
return configClass(**contents)

@pipeBase.timeMethod
def run(self, rawRef, templateIds=None, reuse=None):
"""Execute the ap_pipe pipeline on a single image.
Expand Down

0 comments on commit 450df32

Please sign in to comment.