-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a new task IngestCalibsTask for calibration images
It mirrors and expands IngestTask to parse through the file headers for information and add to the calibration registry of the data repository. Dataset type is determined by the header info OBSTYPE, and by default the calibration images are valid on the observation date only. Optional argument "validity" can be used to set a wider validity period on the command line. Overlaps in validity ranges are fixed.
- Loading branch information
Hsin-Fang Chiang
committed
Sep 29, 2015
1 parent
398297e
commit 7627862
Showing
2 changed files
with
161 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/usr/bin/env python | ||
from lsst.pipe.tasks.ingestCalibs import IngestCalibsTask | ||
IngestCalibsTask.parseAndRun() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import collections | ||
import datetime | ||
import sqlite3 | ||
from lsst.pex.config import Config, ListField, ConfigurableField | ||
from lsst.pipe.base import ArgumentParser | ||
from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask | ||
|
||
def _convertToDate(dateString): | ||
"""Convert a string into a date object, or return None | ||
when the date string cannot be converted with format %Y-%m-%d | ||
""" | ||
try: | ||
return datetime.datetime.strptime(dateString, "%Y-%m-%d").date() | ||
except ValueError: | ||
return None | ||
|
||
class CalibsParseTask(ParseTask): | ||
"""Task that will parse the filename and/or its contents to get the | ||
required information to populate the calibration registry.""" | ||
def getCalibType(self, md): | ||
"""Return a a known calibration dataset type using | ||
the observation type in the header | ||
@param md: (PropertySet) FITS header metadata | ||
""" | ||
if md.exists("OBSTYPE"): | ||
obstype = md.get("OBSTYPE").strip().lower() | ||
if "flat" in obstype: | ||
obstype = "flat" | ||
elif "zero" in obstype or "bias" in obstype: | ||
obstype = "bias" | ||
elif "dark" in obstype: | ||
obstype = "dark" | ||
elif "fringe" in obstype: | ||
obstype = "fringe" | ||
return obstype | ||
else: | ||
return None | ||
|
||
class CalibsRegisterConfig(RegisterConfig): | ||
"""Configuration for the CalibsRegisterTask""" | ||
tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe"], | ||
doc="Name of tables") | ||
|
||
class CalibsRegisterTask(RegisterTask): | ||
"""Task that will generate the calibration registry for the Mapper""" | ||
ConfigClass = CalibsRegisterConfig | ||
|
||
def openRegistry(self, butler, create=False, dryrun=False, name="calibRegistry.sqlite3"): | ||
"""Open the registry and return the connection handle""" | ||
return RegisterTask.openRegistry(self, butler, create, dryrun, name) | ||
|
||
def createTable(self, conn): | ||
"""Create the registry tables""" | ||
for table in self.config.tables: | ||
RegisterTask.createTable(self, conn, table=table) | ||
|
||
def updateValidityRanges(self, conn): | ||
"""Loop over all tables, filters, and ccdnums, | ||
and update the validity ranges in the registry. | ||
@param conn: Database connection | ||
""" | ||
conn.row_factory = sqlite3.Row | ||
cursor = conn.cursor() | ||
for table in self.config.tables: | ||
sql = "SELECT DISTINCT filter,ccdnum FROM %s" % table | ||
cursor.execute(sql) | ||
rows = cursor.fetchall() | ||
for row in rows: | ||
self.resolveOverlaps(conn, table, str(row["filter"]), row["ccdnum"]) | ||
|
||
def resolveOverlaps(self, conn, table, filter, ccdnum): | ||
"""Fix overlaps of validity ranges among selected rows in the registry | ||
@param conn: Database connection | ||
@param table: Name of table to be selected | ||
@param filter: Select condition for column filter | ||
@param ccdnum: Select condition for column ccdnum | ||
""" | ||
sql = "SELECT id, calibDate, validStart, validEnd FROM %s" % table | ||
sql += " WHERE filter='%s' AND ccdnum=%s" % (filter, ccdnum) | ||
sql += " ORDER BY calibDate" | ||
cursor = conn.cursor() | ||
cursor.execute(sql) | ||
rows = cursor.fetchall() | ||
valids = collections.OrderedDict([(_convertToDate(row["calibDate"]), | ||
[_convertToDate(row["validStart"]), | ||
_convertToDate(row["validEnd"])]) for row in rows]) | ||
dates = valids.keys() | ||
if None in dates: | ||
self.log.warn("Skipped fixing the validity overlaps for %s filter=%s ccdnum=%s" | ||
% (table, filter, ccdnum)) | ||
return | ||
numDates = len(dates) | ||
midpoints = [ t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:numDates-1], dates[1:]) ] | ||
for i, (date, midpoint) in enumerate(zip(dates[:numDates-1], midpoints)): | ||
if valids[date][1] > midpoint: | ||
nextDate = dates[i + 1] | ||
valids[nextDate][0] = midpoint + datetime.timedelta(1) | ||
valids[date][1] = midpoint | ||
del midpoints | ||
del dates | ||
for row in rows: | ||
calibDate = _convertToDate(row["calibDate"]) | ||
if calibDate is None: | ||
continue | ||
validStart = valids[calibDate][0].isoformat() | ||
validEnd = valids[calibDate][1].isoformat() | ||
sql = "UPDATE %s" % table | ||
sql += " SET validStart='%s', validEnd='%s'" % (validStart, validEnd) | ||
sql += " WHERE id=%s" % row["id"] | ||
conn.execute(sql) | ||
|
||
class IngestCalibsArgumentParser(ArgumentParser): | ||
"""Argument parser to support ingesting calibration images into the repository""" | ||
def __init__(self, *args, **kwargs): | ||
ArgumentParser.__init__(self, *args, **kwargs) | ||
self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", | ||
default=False, help="Don't perform any action?") | ||
self.add_argument("--create", action="store_true", help="Create new registry?") | ||
self.add_argument("--validity", type=int, help="Calibration validity period (days)") | ||
self.add_argument("files", nargs="+", help="Names of file") | ||
|
||
class IngestCalibsConfig(Config): | ||
"""Configuration for IngestCalibsTask""" | ||
parse = ConfigurableField(target=CalibsParseTask, doc="File parsing") | ||
register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry") | ||
|
||
class IngestCalibsTask(IngestTask): | ||
"""Task that generates registry for calibration images""" | ||
ConfigClass = IngestCalibsConfig | ||
ArgumentParser = IngestCalibsArgumentParser | ||
_DefaultName = "ingestCalibs" | ||
def run(self, args): | ||
"""Ingest all specified files and add them to the registry""" | ||
with self.register.openRegistry(args.butler, create=args.create, dryrun=args.dryrun) as registry: | ||
for infile in args.files: | ||
fileInfo, hduInfoList = self.parse.getInfo(infile) | ||
for info in hduInfoList: | ||
if info["obstype"] in self.register.config.tables: | ||
table = info["obstype"] | ||
info['path'] = infile | ||
if args.validity is not None: | ||
try: | ||
info['validStart'] = (_convertToDate(info['calibDate']) | ||
- datetime.timedelta(args.validity)).isoformat() | ||
info['validEnd'] = (_convertToDate(info['calibDate']) | ||
+ datetime.timedelta(args.validity)).isoformat() | ||
except TypeError: | ||
self.log.warn("Skipped setting validity period of %s" % args.validity) | ||
self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create, table=table) | ||
else: | ||
self.log.warn("Skipped adding %s of obstype %s to registry" % (infile, info["obstype"])) | ||
if args.dryrun: | ||
self.log.info("Would update validity ranges to resolve overlaps") | ||
else: | ||
self.register.updateValidityRanges(registry) |