Skip to content

Commit

Permalink
Add a new task IngestCalibsTask for calibration images
Browse files Browse the repository at this point in the history
It mirrors and expands IngestTask to parse through the file
headers for information and add to the calibration registry
of the data repository.  Dataset type is determined by the
header info OBSTYPE, and by default the calibration images
are valid on the observation date only. Optional argument
"validity" can be used to set a wider validity period on the
command line. Overlaps in validity ranges are fixed.
  • Loading branch information
Hsin-Fang Chiang committed Sep 30, 2015
1 parent 521069b commit 0fa09db
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
3 changes: 3 additions & 0 deletions bin/ingestCalibs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python
from lsst.pipe.tasks.ingestCalibs import IngestCalibsTask
IngestCalibsTask.parseAndRun()
166 changes: 166 additions & 0 deletions python/lsst/pipe/tasks/ingestCalibs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import collections
import datetime
import sqlite3
from lsst.pex.config import Config, ListField, ConfigurableField
from lsst.pipe.base import ArgumentParser
from lsst.pipe.tasks.ingest import RegisterTask, ParseTask, RegisterConfig, IngestTask


def _convertToDate(dateString):
"""Convert a string into a date object, or return None
when the date string cannot be converted with format %Y-%m-%d
"""
try:
return datetime.datetime.strptime(dateString, "%Y-%m-%d").date()
except ValueError:
return None


class CalibsParseTask(ParseTask):
"""Task that will parse the filename and/or its contents to get the
required information to populate the calibration registry."""
def getCalibType(self, md):
"""Return a a known calibration dataset type using
the observation type in the header
@param md: (PropertySet) FITS header metadata
"""
if not md.exists("OBSTYPE"):
return None
obstype = md.get("OBSTYPE").strip().lower()
if "flat" in obstype:
obstype = "flat"
elif "zero" in obstype or "bias" in obstype:
obstype = "bias"
elif "dark" in obstype:
obstype = "dark"
elif "fringe" in obstype:
obstype = "fringe"
return obstype


class CalibsRegisterConfig(RegisterConfig):
"""Configuration for the CalibsRegisterTask"""
tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe"],
doc="Name of tables")


class CalibsRegisterTask(RegisterTask):
"""Task that will generate the calibration registry for the Mapper"""
ConfigClass = CalibsRegisterConfig

def openRegistry(self, butler, create=False, dryrun=False, name="calibRegistry.sqlite3"):
"""Open the registry and return the connection handle"""
return RegisterTask.openRegistry(self, butler, create, dryrun, name)

def createTable(self, conn):
"""Create the registry tables"""
for table in self.config.tables:
RegisterTask.createTable(self, conn, table=table)

def updateValidityRanges(self, conn):
"""Loop over all tables, filters, and ccdnums,
and update the validity ranges in the registry.
@param conn: Database connection
"""
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
for table in self.config.tables:
sql = "SELECT DISTINCT filter,ccdnum FROM %s" % table
cursor.execute(sql)
rows = cursor.fetchall()
for row in rows:
self.resolveOverlaps(conn, table, str(row["filter"]), row["ccdnum"])

def resolveOverlaps(self, conn, table, filter, ccdnum):
"""Fix overlaps of validity ranges among selected rows in the registry
@param conn: Database connection
@param table: Name of table to be selected
@param filter: Select condition for column filter
@param ccdnum: Select condition for column ccdnum
"""
sql = "SELECT id, calibDate, validStart, validEnd FROM %s" % table
sql += " WHERE filter='%s' AND ccdnum=%s" % (filter, ccdnum)
sql += " ORDER BY calibDate"
cursor = conn.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
valids = collections.OrderedDict([(_convertToDate(row["calibDate"]),
[_convertToDate(row["validStart"]),
_convertToDate(row["validEnd"])]) for row in rows])
dates = valids.keys()
if None in dates:
self.log.warn("Skipped fixing the validity overlaps for %s filter=%s ccdnum=%s" %
(table, filter, ccdnum))
return
numDates = len(dates)
midpoints = [t1 + (t2 - t1)//2 for t1, t2 in zip(dates[:numDates-1], dates[1:])]
for i, (date, midpoint) in enumerate(zip(dates[:numDates-1], midpoints)):
if valids[date][1] > midpoint:
nextDate = dates[i + 1]
valids[nextDate][0] = midpoint + datetime.timedelta(1)
valids[date][1] = midpoint
del midpoints
del dates
for row in rows:
calibDate = _convertToDate(row["calibDate"])
validStart = valids[calibDate][0].isoformat()
validEnd = valids[calibDate][1].isoformat()
sql = "UPDATE %s" % table
sql += " SET validStart='%s', validEnd='%s'" % (validStart, validEnd)
sql += " WHERE id=%s" % row["id"]
conn.execute(sql)


class IngestCalibsArgumentParser(ArgumentParser):
"""Argument parser to support ingesting calibration images into the repository"""
def __init__(self, *args, **kwargs):
ArgumentParser.__init__(self, *args, **kwargs)
self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true",
default=False, help="Don't perform any action?")
self.add_argument("--create", action="store_true", help="Create new registry?")
self.add_argument("--validity", type=int, help="Calibration validity period (days)")
self.add_argument("files", nargs="+", help="Names of file")


class IngestCalibsConfig(Config):
"""Configuration for IngestCalibsTask"""
parse = ConfigurableField(target=CalibsParseTask, doc="File parsing")
register = ConfigurableField(target=CalibsRegisterTask, doc="Registry entry")


class IngestCalibsTask(IngestTask):
"""Task that generates registry for calibration images"""
ConfigClass = IngestCalibsConfig
ArgumentParser = IngestCalibsArgumentParser
_DefaultName = "ingestCalibs"

def run(self, args):
"""Ingest all specified files and add them to the registry"""
with self.register.openRegistry(args.butler, create=args.create, dryrun=args.dryrun) as registry:
for infile in args.files:
fileInfo, hduInfoList = self.parse.getInfo(infile)
for info in hduInfoList:
if info["obstype"] not in self.register.config.tables:
self.log.warn("Skipped adding %s of obstype %s to registry" %
(infile, info["obstype"]))
continue
table = info["obstype"]
info['path'] = infile
if args.validity is not None:
try:
info['validStart'] = (_convertToDate(info['calibDate']) -
datetime.timedelta(args.validity)).isoformat()
info['validEnd'] = (_convertToDate(info['calibDate']) +
datetime.timedelta(args.validity)).isoformat()
except TypeError:
self.log.warn("Skipped setting validity period of %s" %
args.validity)
self.register.addRow(registry, info, dryrun=args.dryrun,
create=args.create, table=table)
if args.dryrun:
self.log.info("Would update validity ranges to resolve overlaps")
else:
self.register.updateValidityRanges(registry)

0 comments on commit 0fa09db

Please sign in to comment.