Skip to content

Commit

Permalink
Extend YAML schema with region column declarations.
Browse files Browse the repository at this point in the history
DataUnits with regions and DataUnitRegions now have explicit declaration
for which table column contains region data. This also adds
DataUnitRegion class so that region column declaration can be used by
client code.
  • Loading branch information
andy-slac committed Jul 23, 2018
1 parent f25f924 commit 0b10bbd
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ doc/_build
doc/py-api
butler_test_repository
bin/*
.coverage
4 changes: 4 additions & 0 deletions config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ schema:
tgt:
- Visit.camera
- Visit.visit
regionColumn: region
tables:
Visit:
doc: >
Expand Down Expand Up @@ -535,6 +536,7 @@ schema:
tgt:
- Tract.skymap
- Tract.tract
regionColumn: region
tables:
Tract:
doc: >
Expand Down Expand Up @@ -591,6 +593,7 @@ schema:
- Patch.skymap
- Patch.tract
- Patch.patch
regionColumn: region
tables:
Patch:
doc: >
Expand Down Expand Up @@ -669,6 +672,7 @@ schema:
relates:
- Visit
- Sensor
regionColumn: region
tables:
VisitSensorRegion:
doc: >
Expand Down
96 changes: 88 additions & 8 deletions python/lsst/daf/butler/core/dataUnit.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,19 @@ class DataUnit:
table : `sqlalchemy.core.Table`, optional
When not ``None`` the primary table entry corresponding to this
`DataUnit`.
regionColumn : `str`, optional
Name of the column with encoded region specification, only for tables
which define region column in their schema.
"""
def __init__(self, name, requiredDependencies, optionalDependencies, link=(), table=None):
def __init__(self, name, requiredDependencies, optionalDependencies,
link=(), table=None, regionColumn=None):
self._name = name
self._requiredDependencies = frozenset(requiredDependencies)
self._optionalDependencies = frozenset(optionalDependencies)
self._table = table
self._link = link
self._primaryKey = None
self._regionColumn = regionColumn

def __repr__(self):
return "DataUnit({})".format(self.name)
Expand Down Expand Up @@ -131,6 +136,16 @@ def primaryKeyColumns(self):
"""
return {name: self.table.columns[name] for name in self.primaryKey}

@property
def regionColumn(self):
"""Table column (`sqlalchemy.Column`) with encoded region data,
``None`` if table has no region column.
"""
table = self.table
if table is not None and self._regionColumn is not None:
return table.c[self._regionColumn]
return None

def validateId(self, dataId):
"""Check if given dataId is valid.
Expand Down Expand Up @@ -212,6 +227,56 @@ def table(self):
return getattr(self, '_table', None)


class DataUnitRegion:
"""Represents a relationsip between two or more `DataUnit`s
with associated region.
Parameters
----------
name : `str`
Name of this `DataUnitRegion`, same as the name of the table.
relates : `tuple` of `str`
Names of the DataUnits in this relationship.
table : `sqlalchemy.Table`, optional
The table to be used for queries.
regionColumn : `str`, optional
Name of the column with encoded region specification, only makes
sense when ``table`` is not ``None``.
"""
def __init__(self, name, relates, table=None, regionColumn=None):
self._name = name
self._relates = relates
self._table = table
self._regionColumn = regionColumn

@property
def name(self):
"""Name of this `DataUnitRegion`, same as the name of the table.
"""
return self._name

@property
def relates(self):
return self._relates

@property
def table(self):
"""When not ``None`` the table entry corresponding to this
`DataUnitRegion` (`sqlalchemy.Table`, optional).
"""
return self._table

@property
def regionColumn(self):
"""Table column with encoded region data, ``None`` if table has no
region column (`sqlalchemy.Column`, optional).
"""
table = self.table
if table is not None and self._regionColumn is not None:
return table.c[self._regionColumn]
return None


class DataUnitRegistry:
"""Instances of this class keep track of `DataUnit` relations.
Expand Down Expand Up @@ -272,12 +337,20 @@ def items(self):
for dataUnitName in self._dataUnitNames:
yield (dataUnitName, self[dataUnitName])

def getRegionTable(self, *dataUnitNames):
"""Return the region table that holds regions for the given combination
of DataUnits.
def getRegionHolder(self, *dataUnitNames):
"""Return the DataUnit or DataUnitRegion that holds region for the
given combination of DataUnits.
Returned object can be either `DataUnitRegion` or `DataUnit`. Use
``table`` and/or ``regionColumn`` properties of returned object to
retrieve region data from database table.
Returns
-------
`DataUnitRegion` or `DataUnit` instance.
"""
if len(dataUnitNames) == 1:
return self[dataUnitNames[0]].table
return self[dataUnitNames[0]]
return self._dataUnitRegions[frozenset(dataUnitNames)]

def getJoin(self, lhs, rhs):
Expand Down Expand Up @@ -339,6 +412,7 @@ def _initDataUnits(self, config, builder):
requiredDependencies = ()
optionalDependencies = ()
table = None
regionColumn = None
link = ()
if 'dependencies' in dataUnitDescription:
dependencies = dataUnitDescription['dependencies']
Expand All @@ -363,14 +437,16 @@ def _initDataUnits(self, config, builder):
if tableName == dataUnitName:
# Primary table for this DataUnit
table = builder.addTable(tableName, tableDescription)
regionColumn = dataUnitDescription.get('regionColumn')
else:
# Secondary table
builder.addTable(tableName, tableDescription)
dataUnit = DataUnit(name=dataUnitName,
requiredDependencies=requiredDependencies,
optionalDependencies=optionalDependencies,
table=table,
link=link)
link=link,
regionColumn=regionColumn)
self[dataUnitName] = dataUnit

def _initDataUnitRegions(self, config, builder):
Expand All @@ -388,9 +464,13 @@ def _initDataUnitRegions(self, config, builder):
[(tableName, tableDescription)] = description["tables"].items()
if builder is not None:
table = builder.addTable(tableName, tableDescription)
duRegion = DataUnitRegion(name=tableName,
relates=tuple(description["relates"]),
table=table,
regionColumn=description.get("regionColumn"))
else:
table = None
self._dataUnitRegions[dataUnitNames] = table
duRegion = None
self._dataUnitRegions[dataUnitNames] = duRegion

def _initDataUnitJoins(self, config, builder):
"""Initialize `DataUnit` join entries.
Expand Down
31 changes: 17 additions & 14 deletions python/lsst/daf/butler/registries/sqlPreFlight.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _filterSummarizes(dataUnitJoins):


def _filterRegions(rowIter, firstRegionIndex):
"""Fiters result rows that have non-overlapping regions.
"""Filter result rows that have non-overlapping regions.
Result set generated by query in selectDataUnits() method can include
set of regions in each row (encoded as bytes). Due to pixel-based
Expand All @@ -78,7 +78,8 @@ def _filterRegions(rowIter, firstRegionIndex):
Iterator for rows returned by the query on registry
firstRegionIndex : `int` or ``None``
If not ``None`` then this is the starting position of the regions
in the row, all regions are encoded as bytes.
in the row, all columns starting with this position contain region
data. All regions are encoded as bytes.
"""
total = 0
if firstRegionIndex is not None:
Expand Down Expand Up @@ -212,43 +213,45 @@ def selectDataUnits(self, collections, expr, neededDatasetTypes, futureDatasetTy
dataUnitJoins = list(_filterSummarizes(dataUnitJoins))
_LOG.debug("filtered dataUnitJoins: %s", [join.name for join in dataUnitJoins])

regionTables = set()
joinedRegionTables = set()
firstRegionIndex = None
for dataUnitJoin in dataUnitJoins:

# TODO: do not know yet how to handle MultiCameraExposureJoin, skip it for now
if dataUnitJoin.lhs == dataUnitJoin.rhs:
continue

# Look at each side of the DataUnitJoin and join it with corresponding
# DataUnit tables, including making all necessary joins for special
# multi-DataUnit region table(s).
for connection in (dataUnitJoin.lhs, dataUnitJoin.rhs):
regionTable = self._schema.dataUnits.getRegionTable(*connection)
regionHolder = self._schema.dataUnits.getRegionHolder(*connection)
if len(connection) > 1:
# if one of the joins is with Visit/Sensor then also bring
# VisitSensorRegion table in and join it with the units
if regionTable.name in regionTables:
_LOG.debug("region table already added: %s", regionTable.name)
if regionHolder.name in joinedRegionTables:
_LOG.debug("region table already joined with units: %s", regionHolder.name)
else:
_LOG.debug("add region table: %s", regionTable.name)
regionTables.add(regionTable.name)
_LOG.debug("joining region table with units: %s", regionHolder.name)
joinedRegionTables.add(regionHolder.name)

for dataUnitName in connection:
dataUnit = self._schema.dataUnits[dataUnitName]
_LOG.debug(" joining region table with %s", dataUnitName)
for name, col in dataUnit.primaryKeyColumns.items():
_LOG.debug(" joining on column: %s", name)
where.append(regionTable.c[name] == col)
where.append(regionHolder.table.c[name] == col)

# now join region table with join table using PKs of all units
_LOG.debug("join %s with %s", dataUnitJoin.name, connection)
for colName in self._schema.dataUnits.getPrimaryKeyNames(connection):
_LOG.debug(" joining on column: %s", colName)
where.append(dataUnitJoin.table.c[colName] == regionTable.c[colName])
where.append(dataUnitJoin.table.c[colName] == regionHolder.table.c[colName])

# We also have to include regions from each side of the join into
# resultset so that we can filter-ot non-overlapping regions.
# TODO: Assume that region column name is "region", this probably
# needs to be a config parameter.
# resultset so that we can filter-out non-overlapping regions.
firstRegionIndex = len(header)
selectColumns.append(regionTable.c["region"])
selectColumns.append(regionHolder.regionColumn)

_LOG.debug("units where: %s", [str(x) for x in where])

Expand Down
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/registries/sqlRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ def setDataUnitRegion(self, dataUnitNames, value, region, new=True):
dataUnit = self._schema.dataUnits[dataUnitName]
dataUnit.validateId(value)
keyColumns.update(dataUnit.primaryKeyColumns)
table = self._schema.dataUnits.getRegionTable(*dataUnitNames)
table = self._schema.dataUnits.getRegionHolder(*dataUnitNames).table
if table is None:
raise TypeError("No region table found for '{}'.".format(dataUnitNames))
# If a region record for these DataUnits already exists, use an update
Expand Down

0 comments on commit 0b10bbd

Please sign in to comment.