Skip to content

Commit

Permalink
GEOMESA-176 Implement an 'explain' query feature.
Browse files Browse the repository at this point in the history
Adds an 'explainQuery' feature.

As a rough example/guide try this in the Maven Scala console (i.e., run 'mvn scala:console -pl geomesa-core'):

import org.geotools.data._
import geomesa.core.filter.TestFilters._
import org.geotools.filter.text.ecql.ECQL

val aparams = Map(
"instanceId" -> "INSTANCE_ID",
"zookeepers" -> "ZOOKEEPERS",
"user" -> "USERNAME",
"password"-> new String("PASSWORD"),
"auths"-> "AUTHS",
"tableName"-> "TABLE_NAME",
"useMapReduce" -> "false")
val ds = DataStoreFinder.getDataStore(aparams)

val q = new Query()
val t = Transaction.AUTO_COMMIT

q.setTypeName("gdelt")

val afr = ds.getFeatureReader(q, t).asInstanceOf[AccumuloFeatureReader]
val is = afr.indexSchema

is.explainQuery(q)

val fs = "INTERSECTS(geom, POLYGON ((41 28, 42 28, 42 29, 41 29, 41 28)))"
val f = ECQL.toFilter(fs)
q.setFilter(f)

is.explainQuery(q)
  • Loading branch information
jnh5y authored and cne1x committed Jul 15, 2014
1 parent 0a4d097 commit 775634b
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 83 deletions.
@@ -0,0 +1,32 @@
package geomesa.core.data

import com.typesafe.scalalogging.slf4j.Logging
import org.apache.accumulo.core.client.{BatchScanner, Scanner}
import org.opengis.feature.simple.SimpleFeatureType

trait AccumuloConnectorCreator extends Logging {

/**
* Create a BatchScanner for the SpatioTemporal Index Table
*
* @param numThreads number of threads for the BatchScanner
*/
def createSpatioTemporalIdxScanner(sft: SimpleFeatureType, numThreads: Int): BatchScanner

/**
* Create a BatchScanner for the SpatioTemporal Index Table
*/
def createSTIdxScanner(sft: SimpleFeatureType): BatchScanner

/**
* Create a Scanner for the Attribute Table (Inverted Index Table)
*/
def createAttrIdxScanner(sft: SimpleFeatureType): Scanner

/**
* Create a BatchScanner to retrieve only Records (SimpleFeatures)
*/
def createRecordScanner(sft: SimpleFeatureType, numThreads: Int = 0) : BatchScanner

def catalogTableFormat(sft: SimpleFeatureType): Boolean
}
Expand Up @@ -64,7 +64,7 @@ class AccumuloDataStore(val connector: Connector,
val writeVisibilities: String,
val spatioTemporalIdxSchemaFmt: String = "DEFAULT",
val featureEncoding: FeatureEncoding = FeatureEncoding.AVRO)
extends AbstractDataStore(true) with Logging {
extends AbstractDataStore(true) with AccumuloConnectorCreator with Logging {

// TODO default to zero shards (needs testing)
private val DEFAULT_MAX_SHARD = 99
Expand Down
@@ -1,6 +1,5 @@
package geomesa.core.index


import java.nio.charset.StandardCharsets
import java.util.Map.Entry

Expand Down Expand Up @@ -44,7 +43,6 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
schema:String,
featureType: SimpleFeatureType,
featureEncoder: SimpleFeatureEncoder) extends Logging {

def buildFilter(poly: Polygon, interval: Interval): KeyPlanningFilter =
(IndexSchema.somewhere(poly), IndexSchema.somewhen(interval)) match {
case (None, None) => AcceptEverythingFilter
Expand Down Expand Up @@ -72,26 +70,33 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
case _ => IndexSchema.everywhen.overlap(interval)
}

def log(s: String) = logger.trace(s)

// As a pre-processing step, we examine the query/filter and split it into multiple queries.
// TODO: Work to make the queries non-overlapping.
def getIterator(ds: AccumuloDataStore, sft: SimpleFeatureType, query: Query): CloseableIterator[Entry[Key,Value]] = {
def getIterator(acc: AccumuloConnectorCreator,
sft: SimpleFeatureType,
query: Query,
output: String => Unit = log): CloseableIterator[Entry[Key,Value]] = {
val ff = CommonFactoryFinder.getFilterFactory2
val isDensity = query.getHints.containsKey(BBOX_KEY)
val queries: Iterator[Query] =
if(isDensity) {
val env = query.getHints.get(BBOX_KEY).asInstanceOf[ReferencedEnvelope]
val q1 = new Query(featureType.getTypeName, ff.bbox(ff.property(featureType.getGeometryDescriptor.getLocalName), env))
Iterator(DataUtilities.mixQueries(q1, query, "geomesa.mixed.query"))
} else splitQueryOnOrs(query)
} else splitQueryOnOrs(query, output)

queries.flatMap(runQuery(ds, sft, _, isDensity))
queries.flatMap(runQuery(acc, sft, _, isDensity, output))
}

def splitQueryOnOrs(query: Query): Iterator[Query] = {
def splitQueryOnOrs(query: Query, output: String => Unit): Iterator[Query] = {
val originalFilter = query.getFilter
output(s"Originalfilter is $originalFilter")

val rewrittenFilter = rewriteFilter(originalFilter)

output(s"Filter is rewritten as $rewrittenFilter")

val orSplitter = new OrSplittingFilter
val splitFilters = orSplitter.visit(rewrittenFilter, null)

Expand All @@ -114,40 +119,45 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
*
* If the query is a density query use the spatio-temporal index table only
*/
private def runQuery(ds: AccumuloDataStore, sft: SimpleFeatureType, derivedQuery: Query, isDensity: Boolean) = {
private def runQuery(acc: AccumuloConnectorCreator,
sft: SimpleFeatureType,
derivedQuery: Query,
isDensity: Boolean,
output: String => Unit) = {
val filterVisitor = new FilterToAccumulo(featureType)
val rewrittenFilter = filterVisitor.visit(derivedQuery)
if(ds.catalogTableFormat(sft)){
if(acc.catalogTableFormat(sft)){
// If we have attr index table try it
runAttrIdxQuery(ds, derivedQuery, rewrittenFilter, filterVisitor, isDensity)
runAttrIdxQuery(acc, derivedQuery, rewrittenFilter, filterVisitor, isDensity, output)
} else {
// datastore doesn't support attr index use spatiotemporal only
stIdxQuery(ds, derivedQuery, rewrittenFilter, filterVisitor)
stIdxQuery(acc, derivedQuery, rewrittenFilter, filterVisitor, output)
}
}

/**
* Attempt to run a query against the attribute index if it can be satisfied
* there...if not run against the SpatioTemporal
*/
def runAttrIdxQuery(ds: AccumuloDataStore,
def runAttrIdxQuery(acc: AccumuloConnectorCreator,
derivedQuery: Query,
rewrittenFilter: Filter,
filterVisitor: FilterToAccumulo,
isDensity: Boolean) = {
isDensity: Boolean,
output: String => Unit) = {

rewrittenFilter match {
case isEqualTo: PropertyIsEqualTo if !isDensity =>
attrIdxEqualToQuery(ds, derivedQuery, isEqualTo, filterVisitor)
attrIdxEqualToQuery(acc, derivedQuery, isEqualTo, filterVisitor)

case like: PropertyIsLike if !isDensity =>
if(likeEligible(like))
attrIdxLikeQuery(ds, derivedQuery, like, filterVisitor)
attrIdxLikeQuery(acc, derivedQuery, like, filterVisitor)
else
stIdxQuery(ds, derivedQuery, like, filterVisitor)
stIdxQuery(acc, derivedQuery, like, filterVisitor, output)

case cql =>
stIdxQuery(ds, derivedQuery, cql, filterVisitor)
stIdxQuery(acc, derivedQuery, cql, filterVisitor, output)
}
}

Expand Down Expand Up @@ -175,7 +185,7 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
/**
* Get an iterator that performs an eligible LIKE query against the Attribute Index Table
*/
def attrIdxLikeQuery(dataStore: AccumuloDataStore,
def attrIdxLikeQuery(acc: AccumuloConnectorCreator,
derivedQuery: Query,
filter: PropertyIsLike,
filterVisitor: FilterToAccumulo) = {
Expand All @@ -195,7 +205,7 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,

val range = AccRange.prefix(formatAttrIdxRow(prop, value))

attrIdxQuery(dataStore, derivedQuery, filterVisitor, range)
attrIdxQuery(acc, derivedQuery, filterVisitor, range)
}

def formatAttrIdxRow(prop: String, lit: String) =
Expand All @@ -204,7 +214,7 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
/**
* Get an iterator that performs an EqualTo query against the Attribute Index Table
*/
def attrIdxEqualToQuery(dataStore: AccumuloDataStore,
def attrIdxEqualToQuery(acc: AccumuloConnectorCreator,
derivedQuery: Query,
filter: PropertyIsEqualTo,
filterVisitor: FilterToAccumulo) = {
Expand All @@ -224,19 +234,19 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,

val range = new AccRange(formatAttrIdxRow(prop, lit))

attrIdxQuery(dataStore, derivedQuery, filterVisitor, range)
attrIdxQuery(acc, derivedQuery, filterVisitor, range)
}

/**
* Perform scan against the Attribute Index Table and get an iterator returning records from the Record table
*/
def attrIdxQuery(dataStore: AccumuloDataStore,
def attrIdxQuery(acc: AccumuloConnectorCreator,
derivedQuery: Query,
filterVisitor: FilterToAccumulo,
range: AccRange) = {

logger.trace(s"Scanning attribute table for feature type ${featureType.getTypeName}")
val attrScanner = dataStore.createAttrIdxScanner(featureType)
val attrScanner = acc.createAttrIdxScanner(featureType)

val spatialOpt =
for {
Expand All @@ -262,7 +272,7 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
val ranges = attrScanner.iterator.map(_.getKey.getColumnFamily).map(new AccRange(_))

val recScanner = if(ranges.hasNext) {
val recordScanner = dataStore.createRecordScanner(featureType)
val recordScanner = acc.createRecordScanner(featureType)
recordScanner.setRanges(ranges.toList)
configureSimpleFeatureFilteringIterator(recordScanner, featureType, None, derivedQuery)
Some(recordScanner)
Expand All @@ -278,7 +288,11 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
CloseableIterator(iter, close)
}

def stIdxQuery(ds: AccumuloDataStore, query: Query, rewrittenCQL: Filter, filterVisitor: FilterToAccumulo) = {
def stIdxQuery(acc: AccumuloConnectorCreator,
query: Query,
rewrittenCQL: Filter,
filterVisitor: FilterToAccumulo,
output: String => Unit) = {
logger.trace(s"Scanning ST index table for feature type ${featureType.getTypeName}")
val ecql = Option(ECQL.toCQL(rewrittenCQL))

Expand All @@ -297,15 +311,17 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
val oint = IndexSchema.somewhen(interval)

// set up row ranges and regular expression filter
val bs = ds.createSTIdxScanner(featureType)
planQuery(bs, filter)
val bs = acc.createSTIdxScanner(featureType)

planQuery(bs, filter, output)

logger.trace("Configuring batch scanner for ST table: " +
"Poly: "+ opoly.getOrElse("No poly")+
"Interval: " + oint.getOrElse("No interval")+
"Filter: " + Option(filter).getOrElse("No Filter")+
"ECQL: " + Option(ecql).getOrElse("No ecql")+
"Query: " + Option(query).getOrElse("no query"))
output("Configuring batch scanner for ST table: \n" +
s" Filter ${query.getFilter}\n" +
s" Poly: ${opoly.getOrElse("No poly")}\n" +
s" Interval: ${oint.getOrElse("No interval")}\n" +
s" Filter: ${Option(filter).getOrElse("No Filter")}\n" +
s" ECQL: ${Option(ecql).getOrElse("No ecql")}\n" +
s"Query: ${Option(query).getOrElse("no query")}.")

val iteratorConfig = IteratorTrigger.chooseIterator(ecql, query, featureType)

Expand Down Expand Up @@ -426,8 +442,9 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,
def randomPrintableString(length:Int=5) : String = (1 to length).
map(i => Random.nextPrintableChar()).mkString

def planQuery(bs: BatchScanner, filter: KeyPlanningFilter): BatchScanner = {
val keyPlan = keyPlanner.getKeyPlan(filter)
def planQuery(bs: BatchScanner, filter: KeyPlanningFilter, output: String => Unit): BatchScanner = {
output(s"Planning query/configurating batch scanner: $bs")
val keyPlan = keyPlanner.getKeyPlan(filter, output)
val columnFamilies = cfPlanner.getColumnFamiliesToFetch(filter)

// always try to use range(s) to remove easy false-positives
Expand All @@ -446,7 +463,12 @@ case class IndexQueryPlanner(keyPlanner: KeyPlanner,

// if you have a list of distinct column-family entries, fetch them
columnFamilies match {
case KeyList(keys) => keys.foreach(cf => bs.fetchColumnFamily(new Text(cf)))
case KeyList(keys) => {
output(s"Settings ${keys.size} col fams: $keys.")
keys.foreach { cf =>
bs.fetchColumnFamily(new Text(cf))
}
}
case _ => // do nothing
}

Expand Down
Expand Up @@ -91,16 +91,21 @@ case class IndexSchema(encoder: IndexEncoder,
}


def query(query: Query, ds: AccumuloDataStore): CloseableIterator[SimpleFeature] = {
def query(query: Query, acc: AccumuloConnectorCreator): CloseableIterator[SimpleFeature] = {
// Perform the query
logger.trace(s"Running ${query.toString}")

val accumuloIterator = planner.getIterator(ds, featureType, query)
val accumuloIterator = planner.getIterator(acc, featureType, query)

// Convert Accumulo results to SimpleFeatures
adaptIterator(accumuloIterator, query)
}

// Writes out an explanation of how a query would be run.
def explainQuery(q: Query, output: (String => Unit) = println) = {
planner.getIterator(new ExplainingConnectorCreator(output), featureType, q, output)
}

// This function decodes/transforms that Iterator of Accumulo Key-Values into an Iterator of SimpleFeatures.
def adaptIterator(accumuloIterator: CloseableIterator[Entry[Key,Value]], query: Query): CloseableIterator[SimpleFeature] = {
val returnSFT = getReturnSFT(query)
Expand Down

0 comments on commit 775634b

Please sign in to comment.