Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-12535: Wrap ap_verify and run it over HITS dataset #11

Merged
merged 2 commits into from
Dec 5, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 31 additions & 14 deletions python/lsst/ap/pipe/ap_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
from glob import glob
import sqlite3
import re
import shutil

import lsst.log
from lsst.obs.decam import ingest
Expand Down Expand Up @@ -208,6 +207,9 @@ def parsePipelineArgs():
parser.add_argument('-i', '--dataId',
help="Butler identifier naming the data to be processed (e.g., visit and ccdnum) "
"formatted in the usual way (e.g., 'visit=54321 ccdnum=7').")
parser.add_argument('--no-skip', dest='skip', action='store_false',
help="Do not skip pipeline steps that have already been started. Necessary for "
"processing multiple data IDs in the same repository.")
templateFlags = parser.add_mutually_exclusive_group()
templateFlags.add_argument('--templateId', default='visit=410929',
help="A Butler identifier naming a visit to use as the template "
Expand All @@ -232,6 +234,8 @@ def parsePipelineArgs():
# Retrieve location of refcats directory containing gaia and pan-starrs tarballs
refcats = os.path.join(args.dataset_root, REFCATS_DIR)

skip = args.skip

# Stringly typed code, but I don't see a safer way to do this in Python
if args.templateRepo is not None:
templateType = 'coadd'
Expand All @@ -247,7 +251,8 @@ def parsePipelineArgs():
'calib_datafiles': calib_datafiles, 'defectfiles': defectfiles,
'refcats': refcats,
'dataId': args.dataId,
'template_type': templateType, 'template': template}
'template_type': templateType, 'template': template,
'skip': skip}

return repos_and_files

Expand Down Expand Up @@ -612,10 +617,10 @@ def doProcessCcd(base_repo, dataId):
raw_repo = get_output_repo(base_repo, INGESTED_DIR)
calib_repo = get_output_repo(base_repo, CALIBINGESTED_DIR)
processed_repo = get_output_repo(base_repo, PROCESSED_DIR)
return _doProcessCcd(raw_repo, calib_repo, processed_repo, dataId)
return _doProcessCcd(raw_repo, calib_repo, processed_repo, dataId, skip=False)


def _doProcessCcd(repo, calib_repo, processed_repo, dataId):
def _doProcessCcd(repo, calib_repo, processed_repo, dataId, skip=True):
'''
Perform ISR with ingested images and calibrations via processCcd

Expand All @@ -630,6 +635,8 @@ def _doProcessCcd(repo, calib_repo, processed_repo, dataId):
dataId: `str`
Butler identifier naming the data to be processed (e.g., visit and ccdnum)
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
skip: `bool`
If set, _doProcessCcd will skip processing if data have already been processed.

Returns
-------
Expand Down Expand Up @@ -659,7 +666,7 @@ def _doProcessCcd(repo, calib_repo, processed_repo, dataId):
raise RuntimeError('The dataId string is missing \'visit\'')
else: # save the visit number from the dataId
visit = dataId_dict['visit']
if os.path.isdir(os.path.join(processed_repo, '0'+visit)):
if skip and os.path.isdir(os.path.join(processed_repo, '0'+visit)):
log.warn('ProcessCcd has already been run for visit {0}, skipping...'.format(visit))
return None
if not os.path.isdir(processed_repo):
Expand Down Expand Up @@ -726,10 +733,10 @@ def doDiffIm(base_repo, templateDir, dataId):
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
# TODO: remove this once DM-11865 resolved
_doIngestTemplates(repo, repo, templateDir)
return _doDiffIm(processed_repo, dataId, 'coadd', templateDir, diffim_repo)
return _doDiffIm(processed_repo, dataId, 'coadd', templateDir, diffim_repo, skip=False)


def _doDiffIm(processed_repo, dataId, templateType, template, diffim_repo):
def _doDiffIm(processed_repo, dataId, templateType, template, diffim_repo, skip=True):
'''
Do difference imaging with a template and one or more visits as science

Expand All @@ -749,6 +756,8 @@ def _doDiffIm(processed_repo, dataId, templateType, template, diffim_repo):
template for difference imaging.
diffim_repo: `str`
The output repository location on disk where difference images live.
skip: `bool`
If set, _doDiffIm will skip processing if data have already been processed.

Returns
-------
Expand Down Expand Up @@ -778,7 +787,7 @@ def _doDiffIm(processed_repo, dataId, templateType, template, diffim_repo):
visit = dataId_dict['visit']
_deStringDataId(dataId_dict)

if os.path.exists(os.path.join(diffim_repo, 'deepDiff', 'v' + visit)):
if skip and os.path.exists(os.path.join(diffim_repo, 'deepDiff', 'v' + visit)):
log.warn('DiffIm has already been run for visit {0}, skipping...'.format(visit))
return None

Expand Down Expand Up @@ -841,6 +850,10 @@ def _deStringDataId(dataId):
dataId: `dict`
The dataId to be cleaned up.
'''
try:
basestring
except NameError:
basestring = str
integer = re.compile('^\s*[+-]?\d+\s*$')
for key, value in dataId.items():
if isinstance(value, basestring) and integer.match(value) is not None:
Expand All @@ -866,10 +879,10 @@ def doAssociation(base_repo, dataId):
'''
diffim_repo = get_output_repo(base_repo, DIFFIM_DIR)
db_repo = get_output_repo(base_repo, DB_DIR)
return _doAssociation(diffim_repo, dataId, db_repo)
return _doAssociation(diffim_repo, dataId, db_repo, skip=False)


def _doAssociation(diffim_repo, dataId, db_repo):
def _doAssociation(diffim_repo, dataId, db_repo, skip=True):
'''
Do source association.

Expand All @@ -882,6 +895,8 @@ def _doAssociation(diffim_repo, dataId, db_repo):
formatted in the usual way (e.g., 'visit=54321 ccdnum=7').
db_repo: `str`
The output repository location on disk where the source database lives.
skip: `bool`
If set, _doAssociation will skip processing if data have already been processed.

Returns
-------
Expand Down Expand Up @@ -943,14 +958,16 @@ def runPipelineAlone():

refcats = parsed['refcats']

skip = parsed['skip']

dataId = parsed['dataId']
templateType = parsed['template_type']
template = parsed['template']

# Run all the tasks in order
_doIngest(repo, refcats, datafiles)
_doIngestCalibs(repo, calib_repo, calib_datafiles, defectfiles)
_doProcessCcd(repo, calib_repo, processed_repo, dataId)
_doProcessCcd(repo, calib_repo, processed_repo, dataId, skip=skip)
if templateType == 'coadd':
# TODO: should be unneccessary once DM-11865 is resolved
_doIngestTemplates(repo, repo, template)
Expand All @@ -960,11 +977,11 @@ def runPipelineAlone():
if 'ccdnum' not in dataId_dict.keys():
raise RuntimeError('The dataId string is missing \'ccdnum\'')
ccdTemplate = template + (' ccdnum=%s' % dataId_dict['ccdnum'])
_doProcessCcd(repo, calib_repo, processed_repo, ccdTemplate)
_doProcessCcd(repo, calib_repo, processed_repo, ccdTemplate, skip=skip)
else:
raise ValueError('templateType must be "coadd" or "visit", gave "%s" instead' % templateType)
_doDiffIm(processed_repo, dataId, templateType, template, diffim_repo)
_doAssociation(diffim_repo, dataId, db_repo)
_doDiffIm(processed_repo, dataId, templateType, template, diffim_repo, skip=skip)
_doAssociation(diffim_repo, dataId, db_repo, skip=skip)
log.info('Prototype AP Pipeline run complete.')

return