-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-18739: allow ingesting defects with a slim wrapper on ingestCalibs #295
Changes from all commits
97f70e4
8bcc79a
db6551a
30bf333
162744b
9cf8547
7821591
89e29a4
e76587e
50ce2dd
7fcc77b
9a280b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
#!/usr/bin/env python | ||
from lsst.pipe.tasks.ingestDefects import IngestDefectsTask | ||
IngestDefectsTask.parseAndRun() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -239,16 +239,13 @@ def __init__(self, registryName, createTableFunc, forceCreateTables, permissions | |
delete=False) | ||
self.updateName = updateFile.name | ||
|
||
haveTable = False | ||
if os.path.exists(registryName): | ||
assertCanCopy(registryName, self.updateName) | ||
os.chmod(self.updateName, os.stat(registryName).st_mode) | ||
shutil.copyfile(registryName, self.updateName) | ||
haveTable = True | ||
|
||
self.conn = sqlite3.connect(self.updateName) | ||
if not haveTable or forceCreateTables: | ||
createTableFunc(self.conn) | ||
createTableFunc(self.conn, forceCreateTables=forceCreateTables) | ||
os.chmod(self.updateName, self.permissions) | ||
|
||
def __enter__(self): | ||
|
@@ -298,7 +295,7 @@ def openRegistry(self, directory, create=False, dryrun=False, name="registry.sql | |
context = RegistryContext(registryName, self.createTable, create, self.config.permissions) | ||
return context | ||
|
||
def createTable(self, conn, table=None): | ||
def createTable(self, conn, table=None, forceCreateTables=False): | ||
"""Create the registry tables | ||
|
||
One table (typically 'raw') contains information on all files, and the | ||
|
@@ -307,20 +304,32 @@ def createTable(self, conn, table=None): | |
@param conn Database connection | ||
@param table Name of table to create in database | ||
""" | ||
cursor = conn.cursor() | ||
if table is None: | ||
table = self.config.table | ||
cmd = "SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % table | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I strongly suggest you leave
In cases where you don't think it's worth the effort please consider using f strings, as they are easier to read. |
||
cursor.execute(cmd) | ||
if cursor.fetchone() and not forceCreateTables: # Assume if we get an answer the table exists | ||
self.log.info('Table "%s" exists. Skipping creation' % table) | ||
return | ||
else: | ||
cmd = "drop table if exists %s" % table | ||
cursor.execute(cmd) | ||
cmd = "drop table if exists %s_visit" % table | ||
cursor.execute(cmd) | ||
|
||
cmd = "create table %s (id integer primary key autoincrement, " % table | ||
cmd += ",".join([("%s %s" % (col, colType)) for col, colType in self.config.columns.items()]) | ||
if len(self.config.unique) > 0: | ||
cmd += ", unique(" + ",".join(self.config.unique) + ")" | ||
cmd += ")" | ||
conn.cursor().execute(cmd) | ||
cursor.execute(cmd) | ||
|
||
cmd = "create table %s_visit (" % table | ||
cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit]) | ||
cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")" | ||
cmd += ")" | ||
conn.cursor().execute(cmd) | ||
cursor.execute(cmd) | ||
|
||
conn.commit() | ||
|
||
|
@@ -451,7 +460,7 @@ def ingest(self, infile, outfile, mode="move", dryrun=False): | |
os.symlink(os.path.abspath(infile), outfile) | ||
elif mode == "move": | ||
assertCanCopy(infile, outfile) | ||
os.rename(infile, outfile) | ||
shutil.move(infile, outfile) | ||
else: | ||
raise AssertionError("Unknown mode: %s" % mode) | ||
self.log.info("%s --<%s>--> %s" % (infile, mode, outfile)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import collections | ||
import datetime | ||
import sqlite3 | ||
from dateutil import parser | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider |
||
|
||
from lsst.afw.fits import readMetadata | ||
from lsst.pex.config import Config, Field, ListField, ConfigurableField | ||
|
@@ -10,7 +11,7 @@ | |
|
||
def _convertToDate(dateString): | ||
"""Convert a string into a date object""" | ||
return datetime.datetime.strptime(dateString, "%Y-%m-%d").date() | ||
return parser.parse(dateString).date() | ||
|
||
|
||
class CalibsParseTask(ParseTask): | ||
|
@@ -40,6 +41,8 @@ def getCalibType(self, filename): | |
obstype = "sky" | ||
elif "illumcor" in obstype: | ||
obstype = "illumcor" | ||
elif "defects" in obstype: | ||
obstype = "defects" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is getting a bit long and the stutter worries me (a typo could easily creep in). Consider a loop, such as:
|
||
return obstype | ||
|
||
def getDestination(self, butler, info, filename): | ||
|
@@ -65,13 +68,14 @@ def getDestination(self, butler, info, filename): | |
|
||
class CalibsRegisterConfig(RegisterConfig): | ||
"""Configuration for the CalibsRegisterTask""" | ||
tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky"], doc="Names of tables") | ||
tables = ListField(dtype=str, default=["bias", "dark", "flat", "fringe", "sky", "defects"], | ||
doc="Names of tables") | ||
calibDate = Field(dtype=str, default="calibDate", doc="Name of column for calibration date") | ||
validStart = Field(dtype=str, default="validStart", doc="Name of column for validity start") | ||
validEnd = Field(dtype=str, default="validEnd", doc="Name of column for validity stop") | ||
detector = ListField(dtype=str, default=["filter", "ccd"], | ||
doc="Columns that identify individual detectors") | ||
validityUntilSuperseded = ListField(dtype=str, default=["defect"], | ||
validityUntilSuperseded = ListField(dtype=str, default=["defects"], | ||
doc="Tables for which to set validity for a calib from when it is " | ||
"taken until it is superseded by the next; validity in other tables " | ||
"is calculated by applying the validity range.") | ||
|
@@ -85,18 +89,18 @@ def openRegistry(self, directory, create=False, dryrun=False, name="calibRegistr | |
"""Open the registry and return the connection handle""" | ||
return RegisterTask.openRegistry(self, directory, create, dryrun, name) | ||
|
||
def createTable(self, conn): | ||
def createTable(self, conn, forceCreateTables=False): | ||
"""Create the registry tables""" | ||
for table in self.config.tables: | ||
RegisterTask.createTable(self, conn, table=table) | ||
RegisterTask.createTable(self, conn, table=table, forceCreateTables=forceCreateTables) | ||
|
||
def addRow(self, conn, info, *args, **kwargs): | ||
"""Add a row to the file table""" | ||
info[self.config.validStart] = None | ||
info[self.config.validEnd] = None | ||
RegisterTask.addRow(self, conn, info, *args, **kwargs) | ||
|
||
def updateValidityRanges(self, conn, validity): | ||
def updateValidityRanges(self, conn, validity, tables=None): | ||
"""Loop over all tables, filters, and ccdnums, | ||
and update the validity ranges in the registry. | ||
|
||
|
@@ -105,7 +109,9 @@ def updateValidityRanges(self, conn, validity): | |
""" | ||
conn.row_factory = sqlite3.Row | ||
cursor = conn.cursor() | ||
for table in self.config.tables: | ||
if tables is None: | ||
tables = self.config.tables | ||
for table in tables: | ||
sql = "SELECT DISTINCT %s FROM %s" % (", ".join(self.config.detector), table) | ||
cursor.execute(sql) | ||
rows = cursor.fetchall() | ||
|
@@ -188,11 +194,6 @@ def __init__(self, *args, **kwargs): | |
help="Mode of delivering the files to their destination") | ||
self.add_argument("--create", action="store_true", help="Create new registry?") | ||
self.add_argument("--validity", type=int, required=True, help="Calibration validity period (days)") | ||
self.add_argument("--calibType", type=str, default=None, | ||
choices=[None, "bias", "dark", "flat", "fringe", "sky", "defect"], | ||
help="Type of the calibration data to be ingested;" | ||
" if omitted, the type is determined from" | ||
" the file header information") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If calibType can no longer be specified on the command line, I am pretty sure decam fringe ingestion will break, although it is already pretty broken (it must presently be run with mode=skip and the files must be moved/copied/linked manually to the calib repo). |
||
self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true", | ||
help="Don't register files that have already been registered") | ||
self.add_argument("files", nargs="+", help="Names of file") | ||
|
@@ -217,17 +218,16 @@ def run(self, args): | |
calibRoot = args.calib if args.calib is not None else args.output | ||
filenameList = self.expandFiles(args.files) | ||
with self.register.openRegistry(calibRoot, create=args.create, dryrun=args.dryrun) as registry: | ||
calibTypes = set() | ||
for infile in filenameList: | ||
fileInfo, hduInfoList = self.parse.getInfo(infile) | ||
if args.calibType is None: | ||
calibType = self.parse.getCalibType(infile) | ||
else: | ||
calibType = args.calibType | ||
calibType = self.parse.getCalibType(infile) | ||
if calibType not in self.register.config.tables: | ||
self.log.warn(str("Skipped adding %s of observation type '%s' to registry " | ||
"(must be one of %s)" % | ||
(infile, calibType, ", ".join(self.register.config.tables)))) | ||
continue | ||
calibTypes.add(calibType) | ||
if args.mode != 'skip': | ||
outfile = self.parse.getDestination(args.butler, fileInfo, infile) | ||
ingested = self.ingest(infile, outfile, mode=args.mode, dryrun=args.dryrun) | ||
|
@@ -244,6 +244,6 @@ def run(self, args): | |
self.register.addRow(registry, info, dryrun=args.dryrun, | ||
create=args.create, table=calibType) | ||
if not args.dryrun: | ||
self.register.updateValidityRanges(registry, args.validity) | ||
self.register.updateValidityRanges(registry, args.validity, tables=calibTypes) | ||
else: | ||
self.log.info("Would update validity ranges here, but dryrun") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from .ingestCalibs import IngestCalibsTask | ||
from .read_defects import read_all_defects | ||
from lsst.pipe.base import InputOnlyArgumentParser | ||
|
||
import tempfile | ||
import shutil | ||
import os | ||
|
||
|
||
class IngestDefectsArgumentParser(InputOnlyArgumentParser): | ||
"""Argument parser to support ingesting calibration images into the repository""" | ||
|
||
def __init__(self, *args, **kwargs): | ||
InputOnlyArgumentParser.__init__(self, *args, **kwargs) | ||
self.add_argument("-n", "--dry-run", dest="dryrun", action="store_true", | ||
default=False, help="Don't perform any action?") | ||
self.add_argument("--create", action="store_true", help="Create new registry?") | ||
self.add_argument("--ignore-ingested", dest="ignoreIngested", action="store_true", | ||
help="Don't register files that have already been registered") | ||
self.add_argument("root", help="Root directory to scan for defects.") | ||
|
||
|
||
class IngestDefectsTask(IngestCalibsTask): | ||
"""Task that generates registry for calibration images""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doc string is wrong because this only handles defects. Consider copying the "run" doc string, since "Task that..." is not very helpful (it is clearly a task) and doc strings are supposed to be in active voice. At one time task doc strings were supposed to be extensive, with sections for examples, debugging (which debug flags are supported)...is that still the case? |
||
ArgumentParser = IngestDefectsArgumentParser | ||
_DefaultName = "ingestDefects" | ||
|
||
def run(self, args): | ||
"""Ingest all defect files and add them to the registry""" | ||
|
||
try: | ||
camera = args.butler.get('camera') | ||
temp_dir = tempfile.mkdtemp() | ||
defects = read_all_defects(args.root, camera) | ||
file_names = [] | ||
for d in defects: | ||
for s in defects[d]: | ||
file_name = f'defects_{d}_{s.isoformat()}.fits' | ||
full_file_name = os.path.join(temp_dir, file_name) | ||
self.log.info('%i defects written for sensor: %s and calibDate: %s' % | ||
(len(defects[d][s]), d, s.isoformat())) | ||
defects[d][s].writeFits(full_file_name) | ||
file_names.append(full_file_name) | ||
args.files = file_names | ||
args.mode = 'move' | ||
args.validity = None # Validity range is determined from the files | ||
IngestCalibsTask.run(self, args) | ||
except Exception: | ||
raise(Exception) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This except/raise does nothing useful. Please delete it. |
||
finally: | ||
shutil.rmtree(temp_dir) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from lsst.meas.algorithms import Defects | ||
import os | ||
import glob | ||
import dateutil.parser | ||
|
||
|
||
def read_defects_one_chip(root, chip_name, chip_id): | ||
"""Read defects for a particular sensor from the standard format at a particular root. | ||
|
||
Parameters | ||
---------- | ||
root : str | ||
Path to the top level of the defects tree. This is expected to hold directories | ||
named after the sensor names. They are expected to be lower case. | ||
chip_name : str | ||
The name of the sensor for which to read defects. | ||
chip_id : int | ||
The identifier for the sensor in question. | ||
|
||
Returns | ||
------- | ||
dict | ||
A dictionary of `lsst.meas.algorithms.Defects`. | ||
The key is the validity start time as a `datetime` object. | ||
""" | ||
files = glob.glob(os.path.join(root, chip_name, '*.ecsv')) | ||
parts = os.path.split(root) | ||
instrument = os.path.split(parts[0])[1] # convention is that these reside at <instrument>/defects | ||
defect_dict = {} | ||
for f in files: | ||
date_str = os.path.splitext(os.path.basename(f))[0] | ||
valid_start = dateutil.parser.parse(date_str) | ||
defect_dict[valid_start] = Defects.readText(f) | ||
check_metadata(defect_dict[valid_start], valid_start, instrument, chip_id, f) | ||
return defect_dict | ||
|
||
|
||
def check_metadata(defects, valid_start, instrument, chip_id, f): | ||
"""Check that the metadata is complete and self consistent | ||
|
||
Parameters | ||
---------- | ||
defects : `lsst.meas.algorithms.Defects` | ||
Object to retrieve metadata from in order to compare with | ||
metadata inferred from the path. | ||
valid_start : datetime | ||
Start of the validity range for defects | ||
instrument : str | ||
Name of the instrument in question | ||
chip_id : int | ||
Identifier of the sensor in question | ||
f : str | ||
Path of the file read to produce ``defects`` | ||
|
||
Returns | ||
------- | ||
None | ||
|
||
Raises | ||
------ | ||
ValueError | ||
If the metadata from the path and the metadata encoded | ||
in the path do not match for any reason. | ||
""" | ||
md = defects.getMetadata() | ||
finst = md.get('INSTRUME') | ||
fchip_id = md.get('DETECTOR') | ||
fcalib_date = md.get('CALIBDATE') | ||
if not (finst, int(fchip_id), fcalib_date) == (instrument, chip_id, valid_start.isoformat()): | ||
raise ValueError("Path and file metadata do not agree:\n" + | ||
"Path metadata: %s, %s, %s\n"%(instrument, chip_id, valid_start.isoformat()) + | ||
"File metadata: %s, %s, %s\n"%(finst, fchip_id, fcalib_date) + | ||
"File read from : %s\n"%(f) | ||
) | ||
|
||
|
||
def read_all_defects(root, camera): | ||
"""Read all defects from the standard format at a particular root. | ||
|
||
Parameters | ||
---------- | ||
root : str | ||
Path to the top level of the defects tree. This is expected to hold directories | ||
named after the sensor names. They are expected to be lower case. | ||
camera : `lsst.afw.cameraGeom.Camera` | ||
The camera that goes with the defects being read. | ||
|
||
Returns | ||
------- | ||
dict | ||
A dictionary of dictionaries of `lsst.meas.algorithms.Defects`. | ||
The first key is the sensor name, and the second is the validity | ||
start time as a `datetime` object. | ||
""" | ||
root = os.path.normpath(root) | ||
dirs = os.listdir(root) # assumes all directories contain defects | ||
dirs = [d for d in dirs if os.path.isdir(os.path.join(root, d))] | ||
defects_by_chip = {} | ||
name_map = {det.getName().lower(): det.getName() for | ||
det in camera} # we assume the directories have been lowered | ||
for d in dirs: | ||
chip_name = os.path.basename(d) | ||
chip_id = camera[name_map[chip_name]].getId() | ||
defects_by_chip[chip_name] = read_defects_one_chip(root, chip_name, chip_id) | ||
return defects_by_chip |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,20 +132,20 @@ def testProcessCcd(self): | |
print("psf Ixx = %r, Iyy = %r, Ixy = %r" % (psfIxx, psfIyy, psfIxy)) | ||
|
||
self.assertEqual(len(icSrc), 28) | ||
self.assertEqual(len(src), 186) | ||
self.assertEqual(len(src), 185) | ||
|
||
expectedPlaces = 7 # Tolerance for numerical comparisons | ||
for name, var, val in [ | ||
("bgMean", bgMean, 191.4862217336029), | ||
("bgStdDev", bgStdDev, 0.23986511599945562), | ||
("numGoodPix", numGoodPix, 1965471), | ||
("imMean", imMean, 1.1239582066430034), | ||
("imStdDev", imStdDev, 85.81319381115661), | ||
("varMean", varMean, 131.23984767404193), | ||
("varStdDev", varStdDev, 55.9802472085537), | ||
("psfIxx", psfIxx, 2.8540512421637554), | ||
("psfIyy", psfIyy, 2.1738662399061064), | ||
("psfIxy", psfIxy, 0.1439765855869371) | ||
("bgMean", bgMean, 191.48635852060525), | ||
("bgStdDev", bgStdDev, 0.2399466881603354), | ||
("numGoodPix", numGoodPix, 1966820), | ||
("imMean", imMean, 1.1237668985230562), | ||
("imStdDev", imStdDev, 85.81296241298496), | ||
("varMean", varMean, 131.24003624152013), | ||
("varStdDev", varStdDev, 55.98012493452948), | ||
("psfIxx", psfIxx, 2.769679536557131), | ||
("psfIyy", psfIyy, 2.2013649766299324), | ||
("psfIxy", psfIxy, 0.14797939531970852) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are only testing to 7 places; please consider shortening these values so they only have a few extra digits. It would make the test a bit easier to read (the interesting digits are lost in the middle). I realizing rounding by hand is no fun, but you could easily print the data to the desired precision and copy/paste or use Python to generate this whole block. |
||
]: | ||
self.assertAlmostEqual(var, val, places=expectedPlaces, msg=name) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider updating the doc strings in this file to sphinx.