Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable multiple ingest calls (e.g. by OODS). #329

Merged
merged 6 commits into from
Nov 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 102 additions & 33 deletions python/lsst/pipe/tasks/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@

import os
import shutil
import tempfile
import sqlite3
import sys
import tempfile
from fnmatch import fnmatch
from glob import glob
from contextlib import contextmanager
Expand Down Expand Up @@ -361,39 +362,26 @@ def addRow(self, conn, info, dryrun=False, create=False, table=None):
"""
if table is None:
table = self.config.table
sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
sql += ",".join([self.placeHolder] * len(self.config.columns))
values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]

ignoreClause = ""
if self.config.ignore:
sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
sql += ")"
values += [info[col] for col in self.config.unique]
ignoreClause = " OR IGNORE"
sql = "INSERT%s INTO %s (%s) VALUES (" % (ignoreClause, table, ",".join(self.config.columns))
sql += ",".join([self.placeHolder] * len(self.config.columns)) + ")"
values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]

if dryrun:
print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
else:
conn.cursor().execute(sql, values)

def addVisits(self, conn, dryrun=False, table=None):
"""Generate the visits table (typically 'raw_visits') from the
file table (typically 'raw').
sql = "INSERT OR IGNORE INTO %s_visit VALUES (" % table
sql += ",".join([self.placeHolder] * len(self.config.visit)) + ")"
values = [self.typemap[self.config.columns[col]](info[col]) for col in self.config.visit]

@param conn Database connection
@param table Name of table in database
"""
if table is None:
table = self.config.table
sql = "INSERT INTO %s_visit SELECT DISTINCT " % table
sql += ",".join(self.config.visit)
sql += " FROM %s AS vv1" % table
sql += " WHERE NOT EXISTS "
sql += "(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
if dryrun:
print("Would execute: %s" % sql)
print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
else:
conn.cursor().execute(sql)
conn.cursor().execute(sql, values)


class IngestConfig(Config):
Expand All @@ -404,6 +392,13 @@ class IngestConfig(Config):
clobber = Field(dtype=bool, default=False, doc="Clobber existing file?")


class IngestError(RuntimeError):
def __init__(self, message, pathname, position):
super().__init__(message)
self.pathname = pathname
self.position = position


class IngestTask(Task):
"""Task that will ingest images into the data repository"""
ConfigClass = IngestConfig
Expand All @@ -416,14 +411,66 @@ def __init__(self, *args, **kwargs):
self.makeSubtask("register")

@classmethod
def parseAndRun(cls):
"""Parse the command-line arguments and run the Task"""
def _parse(cls):
"""Parse the command-line arguments and return them along with a Task
instance."""
config = cls.ConfigClass()
parser = cls.ArgumentParser(name=cls._DefaultName)
args = parser.parse_args(config)
task = cls(config=args.config)
return task, args

@classmethod
def parseAndRun(cls):
"""Parse the command-line arguments and run the Task."""
task, args = cls._parse()
task.run(args)

@classmethod
def prepareTask(cls, root=None, dryrun=False, mode="move", create=False,
ignoreIngested=False):
"""Prepare for running the task repeatedly with `ingestFiles`.

Saves the parsed arguments, including the Butler and log, as a
private instance variable.

Parameters
----------
root : `str`, optional
Repository root pathname. If None, run the Task using the
command line arguments, ignoring all other arguments below.
dryrun : `bool`, optional
If True, don't perform any action; log what would have happened.
mode : `str`, optional
How files are delivered to their destination. Default is "move",
unlike the command-line default of "link".
create : `bool`, optional
If True, create a new registry, clobbering any old one present.
ignoreIngested : `bool`, optional
If True, do not complain if the file is already present in the
registry (and do nothing else).

Returns
-------
task : `IngestTask`
If `root` was provided, the IngestTask instance
"""
sys.argv = ["IngestTask"]
sys.argv.append(root)
if dryrun:
sys.argv.append("--dry-run")
sys.argv.append("--mode")
sys.argv.append(mode)
if create:
sys.argv.append("--create")
if ignoreIngested:
sys.argv.append("--ignore-ingested")
sys.argv.append("__fakefile__") # needed for parsing, not used

task, args = cls._parse()
task._args = args
return task

def ingest(self, infile, outfile, mode="move", dryrun=False):
"""Ingest a file into the image repository.

Expand All @@ -443,10 +490,10 @@ def ingest(self, infile, outfile, mode="move", dryrun=False):
if not os.path.isdir(outdir):
try:
os.makedirs(outdir)
except OSError:
except OSError as exc:
# Silently ignore mkdir failures due to race conditions
if not os.path.isdir(outdir):
raise
raise RuntimeError(f"Failed to create directory {outdir}") from exc
if os.path.lexists(outfile):
if self.config.clobber:
os.unlink(outfile)
Expand All @@ -467,7 +514,7 @@ def ingest(self, infile, outfile, mode="move", dryrun=False):
except Exception as e:
self.log.warn("Failed to %s %s to %s: %s" % (mode, infile, outfile, e))
if not self.config.allowError:
raise
raise RuntimeError(f"Failed to {mode} {infile} to {outfile}") from e
return False
return True

Expand Down Expand Up @@ -529,7 +576,7 @@ def runFile(self, infile, registry, args):
fileInfo, hduInfoList = self.parse.getInfo(infile)
except Exception as e:
if not self.config.allowError:
raise
raise RuntimeError(f"Error parsing {infile}") from e
self.log.warn("Error parsing %s (%s); skipping" % (infile, e))
return None
if self.isBadId(fileInfo, args.badId.idList):
Expand All @@ -550,17 +597,39 @@ def run(self, args):
root = args.input
context = self.register.openRegistry(root, create=args.create, dryrun=args.dryrun)
with context as registry:
for infile in filenameList:
for pos in range(len(filenameList)):
infile = filenameList[pos]
try:
hduInfoList = self.runFile(infile, registry, args)
except Exception as exc:
self.log.warn("Failed to ingest file %s: %s", infile, exc)
if not self.config.allowError:
raise IngestError(f"Failed to ingest file {infile}", infile, pos) from exc
continue
if hduInfoList is None:
continue
for info in hduInfoList:
self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
self.register.addVisits(registry, dryrun=args.dryrun)
try:
self.register.addRow(registry, info, dryrun=args.dryrun, create=args.create)
except Exception as exc:
raise IngestError(f"Failed to register file {infile}", infile, pos) from exc

def ingestFiles(self, fileList):
"""Ingest specified file or list of files and add them to the registry.

This method can only be called if `prepareTask` was used.

Parameters
----------
fileList : `str` or `list` [`str`]
Pathname or list of pathnames of files to ingest.
"""
if not hasattr(self, "_args"):
raise RuntimeError("Task not created with prepareTask")
if isinstance(fileList, str):
fileList = [fileList]
self._args.files = fileList
self.run(self._args)


def assertCanCopy(fromPath, toPath):
Expand Down