Skip to content

Commit

Permalink
Refactor library API.
Browse files Browse the repository at this point in the history
The do* functions have been split up to decouple ap_pipe
implementation details (in particular, the structure of the
repository) from clients. This should make it easier to make future
improvements to ap_pipe without breaking client code.

As an immediate benefit, the new API has fewer functions and is much
more focused towards running parts or all of the AP pipeline.
  • Loading branch information
kfindeisen committed Oct 24, 2017
1 parent d8428ab commit ffdaf38
Showing 1 changed file with 154 additions and 19 deletions.
173 changes: 154 additions & 19 deletions python/lsst/ap/pipe/ap_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from __future__ import absolute_import, division, print_function

__all__ = ['get_datafiles', 'get_calib_datafiles', 'get_defectfiles', 'get_output_repo',
__all__ = ['get_output_repo',
'doIngest', 'doIngestCalibs', 'doProcessCcd', 'doDiffIm', 'doAssociation',
'runPipelineAlone']

Expand Down Expand Up @@ -242,7 +242,36 @@ def parsePipelineArgs():
return repos_and_files, idlist


def doIngest(repo, refcats, datafiles):
def doIngest(base_repo, raw_dir, refcat_dir):
'''
Ingest raw DECam images and reference catalogs into a repository
Parameters
----------
base_repo: `str`
The output repository location on disk.
raw_dir: `str`
A directory containing raw image files.
refcats: `str`
A directory containing two .tar.gz files with LSST-formatted astrometric
and photometric reference catalog information. The filenames are set below.
Returns
-------
ingest_metadata: `PropertySet` or None
Metadata from the IngestTask for use by ap_verify
Notes
-----
This function ingests *all* the images, not just the ones for the
specified visits and/or filters. We may want to revisit this in the future.
'''
raw_repo = get_output_repo(base_repo, INGESTED_DIR)
datafiles = get_datafiles(raw_dir)
return _doIngest(raw_repo, refcat_dir, datafiles)


def _doIngest(repo, refcats, datafiles):
'''
Ingest raw DECam images into a repository with a corresponding registry
Expand Down Expand Up @@ -281,7 +310,7 @@ def doIngest(repo, refcats, datafiles):
ASTROM_REFCAT_DIR = 'ref_cats/gaia'
PHOTOM_REFCAT_DIR = 'ref_cats/pan-starrs'

log = lsst.log.Log.getLogger('ap.pipe.doIngest')
log = lsst.log.Log.getLogger('ap.pipe._doIngest')
if os.path.exists(os.path.join(repo, 'registry.sqlite3')):
log.warn('Raw images were previously ingested, skipping...')
return None
Expand Down Expand Up @@ -320,7 +349,7 @@ def doIngest(repo, refcats, datafiles):

def flatBiasIngest(repo, calib_repo, calib_datafiles):
'''
Ingest DECam flats and biases (called by doIngestCalibs)
Ingest DECam flats and biases (called by _doIngestCalibs)
Parameters
----------
Expand Down Expand Up @@ -363,7 +392,7 @@ def flatBiasIngest(repo, calib_repo, calib_datafiles):

def defectIngest(repo, calib_repo, defectfiles):
'''
Ingest DECam defect images (called by doIngestCalibs)
Ingest DECam defect images (called by _doIngestCalibs)
Parameters
----------
Expand Down Expand Up @@ -424,7 +453,39 @@ def defectIngest(repo, calib_repo, defectfiles):
return defect_metadata


def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
def doIngestCalibs(base_repo, calib_dir, defect_dir):
'''
Ingest DECam MasterCal biases, flats, and defects into the working
repository.
Parameters
----------
base_repo: `str`
The output repository location on disk.
calib_dir: `str`
The input directory containing the flat and bias image files.
defect_dir: `str`
The input directory containing the defect image files.
Returns
-------
calibingest_metadata: `PropertySet` or None
Metadata from the IngestCalibTask (flats and biases) and from the
IngestCalibTask (defects) for use by ap_verify
Notes
-----
calib ingestion ingests *all* the calibs, not just the ones needed
for certain visits. We may want to ...revisit... this in the future.
'''
repo = get_output_repo(base_repo, INGESTED_DIR)
calib_repo = get_output_repo(base_repo, CALIBINGESTED_DIR)
calib_datafiles = get_calib_datafiles(calib_dir)
defectfiles = get_defectfiles(defect_dir)
return _doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)


def _doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
'''
Ingest DECam MasterCal biases and flats into a calibration repository with a corresponding registry.
Also ingest DECam defects into the calib registry.
Expand Down Expand Up @@ -458,7 +519,7 @@ def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
calib ingestion ingests *all* the calibs, not just the ones needed
for certain visits. We may want to ...revisit... this in the future.
'''
log = lsst.log.Log.getLogger('ap.pipe.doIngestCalibs')
log = lsst.log.Log.getLogger('ap.pipe._doIngestCalibs')
if not os.path.isdir(calib_repo):
os.mkdir(calib_repo)
flatBias_metadata = flatBiasIngest(repo, calib_repo, calib_datafiles)
Expand All @@ -480,7 +541,33 @@ def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
return calibingest_metadata


def doProcessCcd(repo, calib_repo, processed_repo, dataId):
def doProcessCcd(base_repo, dataId):
'''
Perform ISR with ingested images and calibrations via processCcd
By default, the configuration for astrometric reference catalogs uses Gaia
and the configuration for photometry reference catalogs uses Pan-STARRS.
Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
Returns
-------
process_metadata: `PropertySet` or None
Metadata from the ProcessCcdTask for use by ap_verify
'''
raw_repo = get_output_repo(base_repo, INGESTED_DIR)
calib_repo = get_output_repo(base_repo, CALIBINGESTED_DIR)
processed_repo = get_output_repo(base_repo, PROCESSED_DIR)
return _doProcessCcd(raw_repo, calib_repo, processed_repo, dataId)


def _doProcessCcd(repo, calib_repo, processed_repo, dataId):
'''
Perform ISR with ingested images and calibrations via processCcd
Expand Down Expand Up @@ -517,7 +604,7 @@ def doProcessCcd(repo, calib_repo, processed_repo, dataId):
By default, the configuration for astrometric reference catalogs uses Gaia
and the configuration for photometry reference catalogs uses Pan-STARRS.
'''
log = lsst.log.Log.getLogger('ap.pipe.doProcessCcd')
log = lsst.log.Log.getLogger('ap.pipe._doProcessCcd')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -567,7 +654,33 @@ def doProcessCcd(repo, calib_repo, processed_repo, dataId):
return process_metadata


def doDiffIm(processed_repo, dataId, template, diffim_repo):
def doDiffIm(base_repo, dataId):
'''
Do difference imaging with an automatically selected template.
Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
Returns
-------
diffim_metadata: `PropertySet` or None
Metadata from the ImageDifferenceTask for use by ap_verify
'''
# TEMPORARY HARDWIRED THINGS ARE TEMPORARY
# TODO (DM-11422):
# - use a coadd as a template instead of a visit
template = '410929' # one g-band visit in Blind15A40, temporarily hard-wired
processed_repo = get_output_repo(base_repo, PROCESSED_DIR)
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
return _doDiffIm(processed_repo, dataId, template, diffim_repo)


def _doDiffIm(processed_repo, dataId, template, diffim_repo):
'''
Do difference imaging with a visit as a template and one or more as science
Expand Down Expand Up @@ -606,7 +719,7 @@ def doDiffIm(processed_repo, dataId, template, diffim_repo):
diffim_repo/deepDiff/v+visit populated with difference images
and catalogs of detected sources (diaSrc, diffexp, and metadata files)
'''
log = lsst.log.Log.getLogger('ap.pipe.doDiffIm')
log = lsst.log.Log.getLogger('ap.pipe._doDiffIm')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -670,7 +783,29 @@ def _deStringDataId(dataId):
dataId[key] = int(value)


def doAssociation(diffim_repo, dataId, db_repo):
def doAssociation(base_repo, dataId):
'''
Do source association.
Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
Returns
-------
assoc_metadata: `PropertySet` or None
Metadata from the AssociationTask for use by ap_verify
'''
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
db_repo = get_output_repo(base_repo, DB_DIR)
return _doAssociation(diffim_repo, dataId, db_repo)


def _doAssociation(diffim_repo, dataId, db_repo):
'''
Do source association.
Expand All @@ -689,7 +824,7 @@ def doAssociation(diffim_repo, dataId, db_repo):
assoc_metadata: `PropertySet` or None
Metadata from the AssociationTask for use by ap_verify
'''
log = lsst.log.Log.getLogger('ap.pipe.doAssociation')
log = lsst.log.Log.getLogger('ap.pipe._doAssociation')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -750,12 +885,12 @@ def runPipelineAlone():
dataId_template = 'visit=410929 ccdnum=25' # temporary

# Run all the tasks in order
doIngest(repo, refcats, datafiles)
doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)
doProcessCcd(repo, calib_repo, processed_repo, dataId)
doProcessCcd(repo, calib_repo, processed_repo, dataId_template) # temporary
doDiffIm(processed_repo, dataId, template, diffim_repo)
doAssociation(diffim_repo, dataId, db_repo)
_doIngest(repo, refcats, datafiles)
_doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)
_doProcessCcd(repo, calib_repo, processed_repo, dataId)
_doProcessCcd(repo, calib_repo, processed_repo, dataId_template) # temporary
_doDiffIm(processed_repo, dataId, template, diffim_repo)
_doAssociation(diffim_repo, dataId, db_repo)
log.info('Prototype AP Pipeline run complete.')

return

0 comments on commit ffdaf38

Please sign in to comment.