Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-15588: Remove home-brewed SQLite PPDB #53

Merged
merged 8 commits into from
Nov 30, 2018
Merged
7 changes: 4 additions & 3 deletions python/lsst/ap/verify/ap_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
from .dataset import Dataset
from .ingestion import ingestDataset
from .metrics import MetricsParser, checkSquashReady, AutoJob
from .pipeline_driver import ApPipeParser, runApPipe
from .pipeline_driver import ApPipeParser, runApPipe, _getConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to avoid forcing ap_verify.py to know about implementation details in pipeline_driver. My intent was that the measurements code could just load the config from the Butler, but I'm willing to do that myself on DM-15806.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, should I leave this as in for now then and let DM-15806 take care of it or is there is there something I can do in the mean time?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to you. I think the hardest part of that ticket would be adding a line(s) in pipeline_driver that saves the config to the Butler as if we'd called parseAndRun (much like what I do in #49).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is worth just making another ticket to do the work. It feels like it will take long enough to warrant it.

from .measurements import measureFromMetadata, \
measureFromButlerRepo, \
measureFromL1DbSqlite
measureFromPpdb
from .workspace import Workspace


Expand Down Expand Up @@ -151,7 +151,8 @@ def _measureFinalProperties(metricsJob, metadata, workspace, args):
measurements = []
measurements.extend(measureFromMetadata(metadata))
measurements.extend(measureFromButlerRepo(workspace.outputRepo, args.dataId))
measurements.extend(measureFromL1DbSqlite(workspace.dbLocation))
# TODO: Add butler storage and retrieval of the Ppdb config. DM-16645
measurements.extend(measureFromPpdb(_getConfig(workspace).ppdb))

for measurement in measurements:
metricsJob.measurements.insert(measurement)
Expand Down
13 changes: 5 additions & 8 deletions python/lsst/ap/verify/measurements/association.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import astropy.units as u
import lsst.verify
from lsst.dax.ppdb import countUnassociatedObjects


def measureNumberNewDiaObjects(metadata, taskName, metricName):
Expand Down Expand Up @@ -199,14 +200,13 @@ def measureFractionDiaSourcesToSciSources(butler,
return meas


def measureTotalUnassociatedDiaObjects(dbCursor, metricName):
def measureTotalUnassociatedDiaObjects(ppdb, metricName):
""" Compute number of DIAObjects with only one association DIASource.

Parameters
----------
dbCursor : `sqlite3.Cursor`
Cursor to the sqlite data base created from a previous run of
AssociationDBSqlite task to load.
ppdb : `lsst.dax.ppdb.Ppdb`
Ppdb object connected to the relevant database.
metricName : `str`
The fully qualified name of the metric being measured, e.g.,
"ap_association.totalUnassociatedDiaObjects"
Expand All @@ -216,10 +216,7 @@ def measureTotalUnassociatedDiaObjects(dbCursor, metricName):
measurement : `lsst.verify.Measurement`
a value for `metricName`, or `None`
"""

dbCursor.execute("SELECT count(*) FROM dia_objects "
"WHERE nDiaSources = 1")
(nUnassociatedDiaObjects,) = dbCursor.fetchall()[0]
nUnassociatedDiaObjects = countUnassociatedObjects(ppdb)

meas = lsst.verify.Measurement(
metricName,
Expand Down
20 changes: 8 additions & 12 deletions python/lsst/ap/verify/measurements/compute_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@

__all__ = ["measureFromMetadata",
"measureFromButlerRepo",
"measureFromL1DbSqlite"]
"measureFromPpdb"]

import sqlite3
import re

from lsst.ap.verify.config import Config
import lsst.daf.persistence as dafPersist
import lsst.dax.ppdb as daxPpdb
from .profiling import measureRuntime
from .association import measureNumberNewDiaObjects, \
measureNumberUnassociatedDiaObjects, \
Expand Down Expand Up @@ -159,22 +159,18 @@ def _convertDataIdString(dataId):
return dataIdDict


def measureFromL1DbSqlite(dbName):
"""Make measurements on an sqlite database containing the results of
def measureFromPpdb(config):
"""Make measurements on a ppdb database containing the results of
source association.

dbName : `str`
Name of the sqlite database created from a previous run of
`lsst.ap.association.AssociationDBSqliteTask` to load.
configurable : `lsst.pex.config.Config`
ApVerify configuration with Ppdb configs set.
"""
dbConnection = sqlite3.connect(dbName)
dbCursor = dbConnection.cursor()

result = []
ppdb = daxPpdb.Ppdb(config=config)
measurement = measureTotalUnassociatedDiaObjects(
dbCursor, "ap_association.totalUnassociatedDiaObjects")
ppdb, "ap_association.totalUnassociatedDiaObjects")
if measurement is not None:
result.append(measurement)

dbConnection.close()
return result
8 changes: 5 additions & 3 deletions python/lsst/ap/verify/pipeline_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

import lsst.log
import lsst.daf.persistence as dafPersist
from lsst.ap.association import AssociationDBSqliteTask
import lsst.ap.pipe as apPipe
from lsst.verify import Job

Expand Down Expand Up @@ -242,8 +241,11 @@ def _getConfig(workspace):

config = apPipe.ApPipeTask.ConfigClass()
# Equivalent to task-level default for ap_verify
config.associator.level1_db.retarget(AssociationDBSqliteTask)
config.associator.level1_db.db_name = workspace.dbLocation

# ApVerify will use the sqlite hooks for the Ppdb.
config.ppdb.db_url = "sqlite:///" + workspace.dbLocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is that part that made you ask about whether the path was absolute or relative. I take it something like "sqlite:///workspace/output/association.db" gets resolved as a relative and not an absolute path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. sqlite:/// is always relative path so the above example works. If workspace is an absolute path with the added / at the beginning it "should" resolve to the correct absolute path.

config.ppdb.isolation_level = "READ_UNCOMMITTED"

for path in [
os.path.join(packageDir, 'config'),
os.path.join(packageDir, 'config', mapper.getCameraName()),
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/ap/verify/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def dbLocation(self):
created or updated by the pipeline (`str`, read-only).

Shall be a filename to a database file suitable
for `AssociationDBSqliteTask`.
for the sqlite backend of `Ppdb`.
"""
return os.path.join(self._location, 'association.db')

Expand Down
109 changes: 52 additions & 57 deletions tests/test_association.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
from unittest.mock import NonCallableMock

import astropy.units as u
import numpy as np
import os
import sqlite3

import lsst.daf.base as dafBase
import lsst.daf.persistence as dafPersist
import lsst.dax.ppdb as daxPpdb
import lsst.afw.geom as afwGeom
import lsst.afw.table as afwTable
from lsst.ap.association import \
make_minimal_dia_source_schema, \
make_minimal_dia_object_schema, \
make_dia_source_schema, \
make_dia_object_schema, \
AssociationTask
import lsst.pipe.base as pipeBase
import lsst.utils.tests
Expand All @@ -55,53 +55,44 @@
'filter': 'r'}


def createTestPoints(pointLocsDeg,
def createTestPoints(nPoints,
startId=0,
schema=None,
scatterArcsec=1.0,
indexerIds=None,
associatedIds=None):
schema=None):
"""Create dummy DIASources or DIAObjects for use in our tests.

Parameters
----------
pointLocsDeg : array-like (N, 2) of `float`s
Positions of the test points to create in RA, DEC.
nPoints : `int`
Number of data points to create.
startId : `int`
Unique id of the first object to create. The remaining sources are
incremented by one from the first id.
schema : `lsst.afw.table.Schema`
Schema of the objects to create. Defaults to the DIASource schema.
scatterArcsec : `float`
Scatter to add to the position of each DIASource.
indexerIds : `list` of `ints`s
Id numbers of pixelization indexer to store. Must be the same length
as the first dimension of point_locs_deg.
associatedIds : `list` of `ints`s
Id numbers of associated DIAObjects to store. Must be the same length
as the first dimension of point_locs_deg.

Returns
-------
testPoints : `lsst.afw.table.SourceCatalog`
Catalog of points to test.
"""
if schema is None:
schema = make_minimal_dia_source_schema()
schema = make_dia_source_schema()
sources = afwTable.SourceCatalog(schema)

for src_idx, (ra, dec,) in enumerate(pointLocsDeg):
for src_idx in range(nPoints):
src = sources.addNew()
# Set everything to a simple default value.
for subSchema in schema:
if subSchema.getField().getTypeString() == "Angle":
continue
elif subSchema.getField().getTypeString() == "String":
# Assume that the string column contains the filter name.
src[subSchema.getField().getName()] = 'g'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to figure out that you're assuming this is a filter field; might want to explicitly say that.

else:
src[subSchema.getField().getName()] = 1
# Set the ids by hand
src['id'] = src_idx + startId
coord = afwGeom.SpherePoint(ra, dec, afwGeom.degrees)
if scatterArcsec > 0.0:
coord = coord.offset(
np.random.rand() * 360 * afwGeom.degrees,
np.random.rand() * scatterArcsec * afwGeom.arcseconds)
if indexerIds is not None:
src['pixelId'] = indexerIds[src_idx]
if associatedIds is not None:
src['diaObjectId'] = associatedIds[src_idx]
coord = afwGeom.SpherePoint(src_idx, src_idx, afwGeom.degrees)
src.setCoord(coord)

return sources
Expand All @@ -117,12 +108,14 @@ def setUp(self):
# Create a empty butler repository and put data in it.
self.numTestSciSources = 10
self.numTestDiaSources = 5
testSources = createTestPoints(
pointLocsDeg=[[idx, idx] for idx in
range(self.numTestSciSources)])
testDiaSources = createTestPoints(
pointLocsDeg=[[idx, idx] for idx in
range(self.numTestDiaSources)])
testSources = createTestPoints(self.numTestSciSources)
testDiaSources = createTestPoints(self.numTestDiaSources)

self.numTestDiaObjects = 5
self.diaObjects = createTestPoints(
5, schema=make_dia_object_schema())
for diaObject in self.diaObjects:
diaObject['nDiaSources'] = 1

# Fake Butler to avoid initialization and I/O overhead
def mockGet(datasetType, dataId=None):
Expand All @@ -135,18 +128,27 @@ def mockGet(datasetType, dataId=None):
elif datasetType == 'deepDiff_diaSrc':
return testDiaSources
raise dafPersist.NoResults("Dataset not found:", datasetType, dataId)

self.butler = NonCallableMock(spec=dafPersist.Butler, get=mockGet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please keep the NonCallableMock and mockGet definitions together? It's more confusing with the unrelated self.diaObjects definition in between.


self.numTestDiaObjects = 5
self.diaObjects = createTestPoints(
pointLocsDeg=[[idx, idx] for idx in
range(self.numTestDiaObjects)],
schema=make_minimal_dia_object_schema(['r']))
for diaObject in self.diaObjects:
diaObject['nDiaSources'] = 1
self.ppdbCfg = daxPpdb.PpdbConfig()
# Create DB in memory.
self.ppdbCfg.db_url = 'sqlite://'
self.ppdbCfg.isolation_level = "READ_UNCOMMITTED"
self.ppdbCfg.dia_object_index = "baseline"
self.ppdbCfg.dia_object_columns = []
self.ppdb = daxPpdb.Ppdb(
config=self.ppdbCfg,
afw_schemas=dict(DiaObject=make_dia_object_schema(),
DiaSource=make_dia_source_schema()))
self.ppdb.makeSchema(drop=True)

dateTime = dafBase.DateTime(nsecs=1400000000 * 10**9)
self.ppdb.storeDiaObjects(self.diaObjects, dateTime.toPython())

def tearDown(self):
del self.assocTask
del self.ppdb

if hasattr(self, "butler"):
del self.butler
Expand Down Expand Up @@ -194,10 +196,8 @@ def testValidFromMetadata(self):
self.assertEqual(
meas.metric_name,
lsst.verify.Name(metric='association.fracUpdatedDIAObjects'))
self.assertEqual(meas.quantity,
nUpdatedDiaObjects /
(nUpdatedDiaObjects + nUnassociatedDiaObjects) *
u.dimensionless_unscaled)
value = nUpdatedDiaObjects / (nUpdatedDiaObjects + nUnassociatedDiaObjects)
self.assertEqual(meas.quantity, value * u.dimensionless_unscaled)

def testValidFromButler(self):
""" Test the association measurements that require a butler.
Expand All @@ -223,13 +223,11 @@ def testValidFromButler(self):
self.assertEqual(meas.quantity,
self.numTestDiaSources / self.numTestSciSources * u.dimensionless_unscaled)

def testValidFromSqlite(self):
# Fake DB handle to avoid DB initialization overhead
cursor = NonCallableMock(spec=sqlite3.Cursor)
cursor.fetchall.return_value = [(len(self.diaObjects),)]

def testValidFromPpdb(self):
# Need to have a valid ppdb object so that the internal sqlalchemy
# calls work.
meas = measureTotalUnassociatedDiaObjects(
cursor,
self.ppdb,
metricName='association.numTotalUnassociatedDiaObjects')
self.assertIsInstance(meas, Measurement)
self.assertEqual(
Expand Down Expand Up @@ -307,12 +305,9 @@ def testNoMetric(self):
self.butler, dataId=dataIdDict,
metricName='foo.bar.FooBar')

# Fake DB handle to avoid DB initialization overhead
cursor = NonCallableMock(spec=sqlite3.Cursor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is obsolete, since you deleted the fake.

cursor.fetchall.return_value = [(0,)]
with self.assertRaises(TypeError):
measureTotalUnassociatedDiaObjects(
cursor, metricName='foo.bar.FooBar')
self.ppdb, metricName='foo.bar.FooBar')


class MemoryTester(lsst.utils.tests.MemoryTestCase):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def testRunApPipeCustomConfig(self):
with open(configFile, "w") as f:
# Illegal value; would never be set by a real config
f.write("config.differencer.doWriteSources = False\n")
f.write("config.associator.level1_db.db_name = ':memory:'\n")
f.write("config.ppdb.db_url = 'sqlite://'\n")

task = self.setUpMockPatch("lsst.ap.pipe.ApPipeTask",
spec=True,
Expand All @@ -211,7 +211,7 @@ def testRunApPipeCustomConfig(self):
self.assertIn("config", kwargs)
taskConfig = kwargs["config"]
self.assertFalse(taskConfig.differencer.doWriteSources)
self.assertNotEqual(taskConfig.associator.level1_db.db_name, self.workspace.dbLocation)
self.assertNotEqual(taskConfig.ppdb.db_url, "sqlite:///" + self.workspace.dbLocation)

def testRunApPipeWorkspaceDb(self):
"""Test that runApPipe places a database in the workspace location by default.
Expand All @@ -228,7 +228,7 @@ def testRunApPipeWorkspaceDb(self):
kwargs = call[2]
self.assertIn("config", kwargs)
taskConfig = kwargs["config"]
self.assertEqual(taskConfig.associator.level1_db.db_name, self.workspace.dbLocation)
self.assertEqual(taskConfig.ppdb.db_url, "sqlite:///" + self.workspace.dbLocation)


class MemoryTester(lsst.utils.tests.MemoryTestCase):
Expand Down