Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-12257: Implement association step in ap_verify #8

Merged
merged 1 commit into from
Oct 26, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
173 changes: 154 additions & 19 deletions python/lsst/ap/pipe/ap_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from __future__ import absolute_import, division, print_function

__all__ = ['get_datafiles', 'get_calib_datafiles', 'get_defectfiles', 'get_output_repo',
__all__ = ['get_output_repo',
'doIngest', 'doIngestCalibs', 'doProcessCcd', 'doDiffIm', 'doAssociation',
'runPipelineAlone']

Expand Down Expand Up @@ -242,7 +242,36 @@ def parsePipelineArgs():
return repos_and_files, idlist


def doIngest(repo, refcats, datafiles):
def doIngest(base_repo, raw_dir, refcat_dir):
'''
Ingest raw DECam images and reference catalogs into a repository

Parameters
----------
base_repo: `str`
The output repository location on disk.
raw_dir: `str`
A directory containing raw image files.
refcats: `str`
A directory containing two .tar.gz files with LSST-formatted astrometric
and photometric reference catalog information. The filenames are set below.

Returns
-------
ingest_metadata: `PropertySet` or None
Metadata from the IngestTask for use by ap_verify

Notes
-----
This function ingests *all* the images, not just the ones for the
specified visits and/or filters. We may want to revisit this in the future.
'''
raw_repo = get_output_repo(base_repo, INGESTED_DIR)
datafiles = get_datafiles(raw_dir)
return _doIngest(raw_repo, refcat_dir, datafiles)


def _doIngest(repo, refcats, datafiles):
'''
Ingest raw DECam images into a repository with a corresponding registry

Expand Down Expand Up @@ -281,7 +310,7 @@ def doIngest(repo, refcats, datafiles):
ASTROM_REFCAT_DIR = 'ref_cats/gaia'
PHOTOM_REFCAT_DIR = 'ref_cats/pan-starrs'

log = lsst.log.Log.getLogger('ap.pipe.doIngest')
log = lsst.log.Log.getLogger('ap.pipe._doIngest')
if os.path.exists(os.path.join(repo, 'registry.sqlite3')):
log.warn('Raw images were previously ingested, skipping...')
return None
Expand Down Expand Up @@ -320,7 +349,7 @@ def doIngest(repo, refcats, datafiles):

def flatBiasIngest(repo, calib_repo, calib_datafiles):
'''
Ingest DECam flats and biases (called by doIngestCalibs)
Ingest DECam flats and biases (called by _doIngestCalibs)

Parameters
----------
Expand Down Expand Up @@ -363,7 +392,7 @@ def flatBiasIngest(repo, calib_repo, calib_datafiles):

def defectIngest(repo, calib_repo, defectfiles):
'''
Ingest DECam defect images (called by doIngestCalibs)
Ingest DECam defect images (called by _doIngestCalibs)

Parameters
----------
Expand Down Expand Up @@ -424,7 +453,39 @@ def defectIngest(repo, calib_repo, defectfiles):
return defect_metadata


def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
def doIngestCalibs(base_repo, calib_dir, defect_dir):
'''
Ingest DECam MasterCal biases, flats, and defects into the working
repository.

Parameters
----------
base_repo: `str`
The output repository location on disk.
calib_dir: `str`
The input directory containing the flat and bias image files.
defect_dir: `str`
The input directory containing the defect image files.

Returns
-------
calibingest_metadata: `PropertySet` or None
Metadata from the IngestCalibTask (flats and biases) and from the
IngestCalibTask (defects) for use by ap_verify

Notes
-----
calib ingestion ingests *all* the calibs, not just the ones needed
for certain visits. We may want to ...revisit... this in the future.
'''
repo = get_output_repo(base_repo, INGESTED_DIR)
calib_repo = get_output_repo(base_repo, CALIBINGESTED_DIR)
calib_datafiles = get_calib_datafiles(calib_dir)
defectfiles = get_defectfiles(defect_dir)
return _doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)


def _doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
'''
Ingest DECam MasterCal biases and flats into a calibration repository with a corresponding registry.
Also ingest DECam defects into the calib registry.
Expand Down Expand Up @@ -458,7 +519,7 @@ def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
calib ingestion ingests *all* the calibs, not just the ones needed
for certain visits. We may want to ...revisit... this in the future.
'''
log = lsst.log.Log.getLogger('ap.pipe.doIngestCalibs')
log = lsst.log.Log.getLogger('ap.pipe._doIngestCalibs')
if not os.path.isdir(calib_repo):
os.mkdir(calib_repo)
flatBias_metadata = flatBiasIngest(repo, calib_repo, calib_datafiles)
Expand All @@ -480,7 +541,33 @@ def doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles):
return calibingest_metadata


def doProcessCcd(repo, calib_repo, processed_repo, dataId):
def doProcessCcd(base_repo, dataId):
'''
Perform ISR with ingested images and calibrations via processCcd

By default, the configuration for astrometric reference catalogs uses Gaia
and the configuration for photometry reference catalogs uses Pan-STARRS.

Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').

Returns
-------
process_metadata: `PropertySet` or None
Metadata from the ProcessCcdTask for use by ap_verify
'''
raw_repo = get_output_repo(base_repo, INGESTED_DIR)
calib_repo = get_output_repo(base_repo, CALIBINGESTED_DIR)
processed_repo = get_output_repo(base_repo, PROCESSED_DIR)
return _doProcessCcd(raw_repo, calib_repo, processed_repo, dataId)


def _doProcessCcd(repo, calib_repo, processed_repo, dataId):
'''
Perform ISR with ingested images and calibrations via processCcd

Expand Down Expand Up @@ -517,7 +604,7 @@ def doProcessCcd(repo, calib_repo, processed_repo, dataId):
By default, the configuration for astrometric reference catalogs uses Gaia
and the configuration for photometry reference catalogs uses Pan-STARRS.
'''
log = lsst.log.Log.getLogger('ap.pipe.doProcessCcd')
log = lsst.log.Log.getLogger('ap.pipe._doProcessCcd')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -567,7 +654,33 @@ def doProcessCcd(repo, calib_repo, processed_repo, dataId):
return process_metadata


def doDiffIm(processed_repo, dataId, template, diffim_repo):
def doDiffIm(base_repo, dataId):
'''
Do difference imaging with an automatically selected template.

Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').

Returns
-------
diffim_metadata: `PropertySet` or None
Metadata from the ImageDifferenceTask for use by ap_verify
'''
# TEMPORARY HARDWIRED THINGS ARE TEMPORARY
# TODO (DM-11422):
# - use a coadd as a template instead of a visit
template = '410929' # one g-band visit in Blind15A40, temporarily hard-wired
processed_repo = get_output_repo(base_repo, PROCESSED_DIR)
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
return _doDiffIm(processed_repo, dataId, template, diffim_repo)


def _doDiffIm(processed_repo, dataId, template, diffim_repo):
'''
Do difference imaging with a visit as a template and one or more as science

Expand Down Expand Up @@ -606,7 +719,7 @@ def doDiffIm(processed_repo, dataId, template, diffim_repo):
diffim_repo/deepDiff/v+visit populated with difference images
and catalogs of detected sources (diaSrc, diffexp, and metadata files)
'''
log = lsst.log.Log.getLogger('ap.pipe.doDiffIm')
log = lsst.log.Log.getLogger('ap.pipe._doDiffIm')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -670,7 +783,29 @@ def _deStringDataId(dataId):
dataId[key] = int(value)


def doAssociation(diffim_repo, dataId, db_repo):
def doAssociation(base_repo, dataId):
'''
Do source association.

Parameters
----------
base_repo: `str`
The output repository location on disk.
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').

Returns
-------
assoc_metadata: `PropertySet` or None
Metadata from the AssociationTask for use by ap_verify
'''
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
db_repo = get_output_repo(base_repo, DB_DIR)
return _doAssociation(diffim_repo, dataId, db_repo)


def _doAssociation(diffim_repo, dataId, db_repo):
'''
Do source association.

Expand All @@ -689,7 +824,7 @@ def doAssociation(diffim_repo, dataId, db_repo):
assoc_metadata: `PropertySet` or None
Metadata from the AssociationTask for use by ap_verify
'''
log = lsst.log.Log.getLogger('ap.pipe.doAssociation')
log = lsst.log.Log.getLogger('ap.pipe._doAssociation')
dataId_items = re.split('[ +=]', dataId)
dataId_dict = dict(zip(dataId_items[::2], dataId_items[1::2]))
if 'visit' not in dataId_dict.keys():
Expand Down Expand Up @@ -750,12 +885,12 @@ def runPipelineAlone():
dataId_template = 'visit=410929 ccdnum=25' # temporary

# Run all the tasks in order
doIngest(repo, refcats, datafiles)
doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)
doProcessCcd(repo, calib_repo, processed_repo, dataId)
doProcessCcd(repo, calib_repo, processed_repo, dataId_template) # temporary
doDiffIm(processed_repo, dataId, template, diffim_repo)
doAssociation(diffim_repo, dataId, db_repo)
_doIngest(repo, refcats, datafiles)
_doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)
_doProcessCcd(repo, calib_repo, processed_repo, dataId)
_doProcessCcd(repo, calib_repo, processed_repo, dataId_template) # temporary
_doDiffIm(processed_repo, dataId, template, diffim_repo)
_doAssociation(diffim_repo, dataId, db_repo)
log.info('Prototype AP Pipeline run complete.')

return