Skip to content

Commit

Permalink
Add AssociationTask.
Browse files Browse the repository at this point in the history
The API for AssociationTask currently requires that the database be
set up as a separate step. The code for doing so has been decoupled
from AssociationDBSqliteTask as much as possible to make it easier
to add new DB implementations in the future.
  • Loading branch information
kfindeisen committed Oct 24, 2017
1 parent f278852 commit fbbf4be
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 2 deletions.
96 changes: 94 additions & 2 deletions python/lsst/ap/pipe/ap_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from __future__ import absolute_import, division, print_function

__all__ = ['get_datafiles', 'get_calib_datafiles', 'get_defectfiles', 'get_output_repo',
'doIngest', 'doIngestCalibs', 'doProcessCcd', 'doDiffIm',
'doIngest', 'doIngestCalibs', 'doProcessCcd', 'doDiffIm', 'doAssociation',
'runPipelineAlone']

import os
Expand All @@ -57,6 +57,7 @@
from lsst.utils import getPackageDir
from lsst.ip.diffim.getTemplate import GetCalexpAsTemplateTask
from lsst.pipe.tasks.imageDifference import ImageDifferenceConfig, ImageDifferenceTask
from lsst.ap.association import AssociationDBSqliteTask, AssociationConfig, AssociationTask
import lsst.daf.persistence as dafPersist

# Names of directories containing data products in dataset_root
Expand All @@ -74,6 +75,7 @@
CALIBINGESTED_DIR = 'calibingested'
PROCESSED_DIR = 'processed'
DIFFIM_DIR = 'diffim'
DB_DIR = 'l1db'


def get_datafiles(raw_location):
Expand Down Expand Up @@ -218,6 +220,7 @@ def parsePipelineArgs():
calib_repo = get_output_repo(args.output, CALIBINGESTED_DIR)
processed_repo = get_output_repo(args.output, PROCESSED_DIR)
diffim_repo = get_output_repo(args.output, DIFFIM_DIR)
db_repo = get_output_repo(args.output, DB_DIR)

# Retrieve location of refcats directory containing gaia and pan-starrs tarballs
refcats = os.path.join(args.dataset_root, REFCATS_DIR)
Expand All @@ -230,7 +233,8 @@ def parsePipelineArgs():

repos_and_files = {'repo': repo, 'calib_repo': calib_repo,
'processed_repo': processed_repo,
'diffim_repo': diffim_repo, 'datafiles': datafiles,
'diffim_repo': diffim_repo, 'db_repo': db_repo,
'datafiles': datafiles,
'calib_datafiles': calib_datafiles, 'defectfiles': defectfiles,
'refcats': refcats}
idlist = [args.dataId, template]
Expand Down Expand Up @@ -630,6 +634,92 @@ def doDiffIm(processed_repo, dataId, template, diffim_repo):
return diffim_metadata


def _setupDatabase(configurable):
'''
Set up a database according to a configuration.
Takes no action if the database already exists.
Parameters
----------
configurable: `lsst.pex.config.ConfigurableInstance`
A ConfigurableInstance with a database-managing class in its `target`
field. The API of `target` must expose a `create_tables` method taking
no arguments.
'''
db = configurable.apply()
try:
db.create_tables()
finally:
db.close()


# TODO: duplication of ArgumentParser's internal functionality; remove this in DM-11372
def _deStringDataId(dataId):
'''
Replace a dataId's values with numbers, where appropriate.
Parameters
----------
dataId: `dict`
The dataId to be cleaned up.
'''
integer = re.compile('^\s*[+-]?\d+\s*$')
for key, value in dataId.items():
if isinstance(value, basestring) and integer.match(value) is not None:
dataId[key] = int(value)


def doAssociation(diffim_repo, dataId, db_repo):
'''
Do source association.
Parameters
----------
diffim_repo: `str`
The output repository location on disk where difference images live.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
db_repo: `str`
The output repository location on disk where the source database lives.
Returns
-------
assoc_metadata: `PropertySet` or None
Metadata from the AssociationTask for use by ap_verify
'''
log = lsst.log.Log.getLogger('ap.pipe.doAssociation')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
raise RuntimeError('The dataId string is missing \'visit\'')
_deStringDataId(dataId_dict)

# No reasonable way to check if Association finished successfully
if not os.path.isdir(db_repo):
os.mkdir(db_repo)

log.info('Running Association...')
config = AssociationConfig()
config.level1_db.retarget(AssociationDBSqliteTask)
config.level1_db.db_name = os.path.join(db_repo, 'association.db')

butler = dafPersist.Butler(inputs=diffim_repo)

_setupDatabase(config.level1_db)

associationTask = AssociationTask(config=config)
try:
catalog = butler.get('deepDiff_diaSrc', dataId=dataId_dict)
exposure = butler.get('deepDiff_differenceExp', dataId=dataId_dict)
associationTask.run(catalog, exposure)
finally:
associationTask.level1_db.close()

return associationTask.getFullMetadata()


def runPipelineAlone():
'''
Run each step of the pipeline. NOT used by ap_verify.
Expand All @@ -646,6 +736,7 @@ def runPipelineAlone():
calib_repo = repos_and_files['calib_repo']
processed_repo = repos_and_files['processed_repo']
diffim_repo = repos_and_files['diffim_repo']
db_repo = repos_and_files['db_repo']

datafiles = repos_and_files['datafiles']
calib_datafiles = repos_and_files['calib_datafiles']
Expand All @@ -664,6 +755,7 @@ def runPipelineAlone():
doProcessCcd(repo, calib_repo, processed_repo, dataId)
doProcessCcd(repo, calib_repo, processed_repo, dataId_template) # temporary
doDiffIm(processed_repo, dataId, template, diffim_repo)
doAssociation(diffim_repo, dataId, db_repo)
log.info('Prototype AP Pipeline run complete.')

return
1 change: 1 addition & 0 deletions ups/ap_pipe.table
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ setupRequired(pex_exceptions >= 4.6.0.0)
setupRequired(ndarray)

setupRequired(obs_decam)
setupRequired(ap_association)

# The following is boilerplate for all packages.
# See Tech Note DMTN-001 for details on LSST_LIBRARY_PATH
Expand Down

0 comments on commit fbbf4be

Please sign in to comment.