Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-10267: Port HSC support for PostgreSQL registries to LSST #121

Merged
merged 6 commits into from
May 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin.src/ingestImagesPgsql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env python
from lsst.pipe.tasks.ingestPgsql import PgsqlIngestTask
PgsqlIngestTask.parseAndRun()
52 changes: 33 additions & 19 deletions python/lsst/pipe/tasks/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sqlite as sqlite3
from fnmatch import fnmatch
from glob import glob
from contextlib import contextmanager

from lsst.pex.config import Config, Field, DictField, ListField, ConfigurableField
import lsst.pex.exceptions
Expand Down Expand Up @@ -209,6 +210,8 @@ def __init__(self, registryName, createTableFunc, forceCreateTables, permissions

@param registryName: Name of registry file
@param createTableFunc: Function to create tables
@param forceCreateTables: Force the (re-)creation of tables?
@param permissions: Permissions to set on database file
"""
self.registryName = registryName
self.permissions = permissions
Expand Down Expand Up @@ -245,9 +248,20 @@ def __exit__(self, excType, excValue, traceback):
return False # Don't suppress any exceptions


@contextmanager
def fakeContext():
"""A context manager that doesn't provide any context

Useful for dry runs where we don't want to actually do anything real.
"""
yield


class RegisterTask(Task):
"""Task that will generate the registry for the Mapper"""
ConfigClass = RegisterConfig
placeHolder = '?' # Placeholder for parameter substitution; this value suitable for sqlite3
typemap = {'text': str, 'int': int, 'double': float} # Mapping database type --> python type

def openRegistry(self, directory, create=False, dryrun=False, name="registry.sqlite3"):
"""Open the registry and return the connection handle.
Expand All @@ -259,11 +273,6 @@ def openRegistry(self, directory, create=False, dryrun=False, name="registry.sql
@return Database connection
"""
if dryrun:
from contextlib import contextmanager

@contextmanager
def fakeContext():
yield
return fakeContext()

registryName = os.path.join(directory, name)
Expand All @@ -286,13 +295,13 @@ def createTable(self, conn, table=None):
if len(self.config.unique) > 0:
cmd += ", unique(" + ",".join(self.config.unique) + ")"
cmd += ")"
conn.execute(cmd)
conn.cursor().execute(cmd)

cmd = "create table %s_visit (" % table
cmd += ",".join([("%s %s" % (col, self.config.columns[col])) for col in self.config.visit])
cmd += ", unique(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
cmd += ")"
conn.execute(cmd)
conn.cursor().execute(cmd)

conn.commit()

Expand All @@ -307,8 +316,8 @@ def check(self, conn, info, table=None):
return False # Our entry could already be there, but we don't care
cursor = conn.cursor()
sql = "SELECT COUNT(*) FROM %s WHERE " % table
sql += " AND ".join(["%s=?" % col for col in self.config.unique])
values = [info[col] for col in self.config.unique]
sql += " AND ".join(["%s = %s" % (col, self.placeHolder) for col in self.config.unique])
values = [self.typemap[self.config.columns[col]](info[col]) for col in self.config.unique]

cursor.execute(sql, values)
if cursor.fetchone()[0] > 0:
Expand All @@ -324,17 +333,20 @@ def addRow(self, conn, info, dryrun=False, create=False, table=None):
"""
if table is None:
table = self.config.table
sql = "INSERT"
sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
sql += ",".join([self.placeHolder] * len(self.config.columns))
values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]

if self.config.ignore:
sql += " OR IGNORE"
sql += " INTO %s VALUES (NULL" % table
sql += ", ?" * len(self.config.columns)
sql += ")"
values = [info[col] for col in self.config.columns]
sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % self.config.table
sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
sql += ")"
values += [info[col] for col in self.config.unique]

if dryrun:
print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
else:
conn.execute(sql, values)
conn.cursor().execute(sql, values)

def addVisits(self, conn, dryrun=False, table=None):
"""Generate the visits table (typically 'raw_visits') from the
Expand All @@ -345,13 +357,15 @@ def addVisits(self, conn, dryrun=False, table=None):
"""
if table is None:
table = self.config.table
sql = "INSERT OR IGNORE INTO %s_visit SELECT DISTINCT " % table
sql = "INSERT INTO %s_visit SELECT DISTINCT " % table
sql += ",".join(self.config.visit)
sql += " FROM %s" % table
sql += " FROM %s AS vv1" % table
sql += " WHERE NOT EXISTS "
sql += "(SELECT vv2.visit FROM %s_visit AS vv2 WHERE vv1.visit = vv2.visit)" % (table,)
if dryrun:
print("Would execute: %s" % sql)
else:
conn.execute(sql)
conn.cursor().execute(sql)


class IngestConfig(Config):
Expand Down
111 changes: 111 additions & 0 deletions python/lsst/pipe/tasks/ingestPgsql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import absolute_import, division, print_function

import os

from lsst.pex.config import ConfigurableField
from lsst.pipe.tasks.ingest import IngestTask, IngestConfig, RegisterTask, RegistryContext, fakeContext
from lsst.daf.persistence.registries import PgsqlRegistry

try:
import psycopg2 as pgsql
havePgSql = True
except ImportError:
havePgSql = False


class PgsqlRegistryContext(RegistryContext):
"""Context manager to provide a pgsql registry
"""
def __init__(self, registryName, createTableFunc, forceCreateTables):
"""Construct a context manager

@param registryName: Name of registry file
@param createTableFunc: Function to create tables
@param forceCreateTables: Force the (re-)creation of tables?
"""
self.registryName = registryName
data = PgsqlRegistry.readYaml(registryName)
self.conn = pgsql.connect(host=data["host"], port=data["port"], user=data["user"],
password=data["password"], database=data["database"])
cur = self.conn.cursor()

# Check for existence of tables
cur.execute("SELECT relname FROM pg_class WHERE relkind='r' AND relname='raw'")
rows = cur.fetchall()

if forceCreateTables or len(rows) == 0:
# Delete all tables and start over.
# Not simply doing "DROP SCHEMA" and "CREATE SCHEMA" because of permissions.
cur.execute("SELECT tablename FROM pg_tables WHERE schemaname = 'public'")
tables = cur.fetchall()
for tt in tables:
cur.execute("DROP TABLE %s CASCADE" % tt)
createTableFunc(self.conn)

def __exit__(self, excType, excValue, traceback):
self.conn.commit()
self.conn.close()
return False # Don't suppress any exceptions


class PgsqlRegisterTask(RegisterTask):
placeHolder = "%s"

def openRegistry(self, directory, create=False, dryrun=False):
"""Open the registry and return the connection handle.

@param directory Directory in which the registry file will be placed
@param create Clobber any existing registry and create a new one?
@param dryrun Don't do anything permanent?
@return Database connection
"""
if dryrun:
return fakeContext()
registryName = os.path.join(directory, "registry.pgsql")
return PgsqlRegistryContext(registryName, self.createTable, create)

def createTable(self, conn, table=None):
"""Create the registry tables

One table (typically 'raw') contains information on all files, and the
other (typically 'raw_visit') contains information on all visits.

This method is required because there's a slightly different syntax
compared to SQLite (FLOAT instead of DOUBLE, SERIAL instead of
AUTOINCREMENT).

@param conn Database connection
@param table Name of table to create in database
"""
if table is None:
table = self.config.table

typeMap = {'int': 'INT',
'double': 'FLOAT', # Defaults to double precision
}

cur = conn.cursor()
cmd = "CREATE TABLE %s (id SERIAL NOT NULL PRIMARY KEY, " % table
cmd += ",".join(["%s %s" % (col, typeMap.get(colType.lower(), 'text')) for
col, colType in self.config.columns.items()])
if len(self.config.unique) > 0:
cmd += ", UNIQUE(" + ",".join(self.config.unique) + ")"
cmd += ")"
cur.execute(cmd)
Copy link
Contributor

@n8pease n8pease May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought when using dbapi, it was preferred to let the cursor.execute(operation, parameters) method substitute the values into the string, for protection against Bobby Tables. It looks like you're doing the substitutions above. Are you sure you're safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had Little Bobby Tables in my mind as I was working, but it doesn't look like the placeholders work when used for table names or types --- just values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, yes. I had that experience too.
I'm not sure what the security requirements are in that case. @timj?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we always used ? placeholders.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the db library have a method for cleaning external input?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you call execute, and pass a string with placeholders in for operation and parameters in, the parameters will be cleaned before substituted into the operation string.
the placeholder character seems to depend on paramstyle(?)


cmd = "CREATE TABLE %s_visit (" % self.config.table
cmd += ",".join(["%s %s" % (col, typeMap.get(self.config.columns[col].lower(), 'TEXT')) for
col in self.config.visit])
cmd += ", UNIQUE(" + ",".join(set(self.config.visit).intersection(set(self.config.unique))) + ")"
cmd += ")"
cur.execute(cmd)
del cur
conn.commit()


class PgsqlIngestConfig(IngestConfig):
register = ConfigurableField(target=PgsqlRegisterTask, doc="Registry entry")


class PgsqlIngestTask(IngestTask):
ConfigClass = PgsqlIngestConfig