Skip to content

Commit

Permalink
GEOMESA-1251,GEOMESA-1250 Adding top-k stat, allowing stat gathering …
Browse files Browse the repository at this point in the history
…for non-indexed attributes (#929)

Signed-off-by: Emilio Lahr-Vivaz <elahrvivaz@ccri.com>
  • Loading branch information
elahrvivaz authored and anthonyccri committed Jun 18, 2016
1 parent 9bc45f7 commit 2287eb5
Show file tree
Hide file tree
Showing 20 changed files with 674 additions and 167 deletions.
31 changes: 17 additions & 14 deletions docs/user/commandline_tools.rst
Expand Up @@ -35,7 +35,7 @@ Run ``geomesa`` without any arguments to produce the following usage text::
stats-analyze Analyze statistics on a GeoMesa feature type
stats-bounds View or calculate bounds on attributes in a GeoMesa feature type
stats-count Estimate or calculate feature counts in a GeoMesa feature type
stats-enumerate Enumerate attribute values in a GeoMesa feature type
stats-top-k Enumerate the most frequent values in a GeoMesa feature type
stats-histogram View or calculate counts of attribute in a GeoMesa feature type, grouped by sorted values
removeschema Remove a schema and associated features from a GeoMesa catalog
tableconf Perform table configuration operations
Expand Down Expand Up @@ -509,24 +509,27 @@ Example usage::
Count: 182


stats-enumerate
~~~~~~~~~~~~~~~
stats-top-k
~~~~~~~~~~~

Enumerates the values for attributes in your data set. You can enumerate all values, or only values for
features that match a CQL filter.
Enumerates the values for attributes in your data set. You can enumerate all values for all features,
or only values for features that match a CQL filter.

Example usage::

$ geomesa stats-enumerate -u username -p password -i instance -z zoo1,zoo2,zoo3 \
-c geomesa.data -f twitter -a user_id
Running stat query...
Values for 'user_id':
3144822634 (26383)
388009236 (20457)
497145453 (19514)
$ geomesa stats-top-k -u username -p password -i instance -z zoo1,zoo2,zoo3 \
-c geomesa.data -f twitter -a user_id -k 10
Top values for 'user_id':
3144822634 (26681)
388009236 (20553)
497145453 (19858)
563319506 (15848)
2841269945 (15716)
...
2841269945 (15763)
2924224280 (15731)
141302910 (15240)
2587789764 (14811)
56266341 (14487)
889599440 (14330)

stats-histogram
~~~~~~~~~~~~~~~
Expand Down
Expand Up @@ -104,9 +104,10 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
override def getStats[T <: Stat](sft: SimpleFeatureType,
attributes: Seq[String],
options: Seq[Any])(implicit ct: ClassTag[T]): Seq[T] = {
val toRetrieve = {
val available = GeoMesaStats.statAttributesFor(sft)
if (attributes.isEmpty) available else attributes.filter(available.contains)
val toRetrieve = if (attributes.nonEmpty) {
attributes.filter(a => Option(sft.getDescriptor(a)).exists(GeoMesaStats.okForStats))
} else {
sft.getAttributeDescriptors.filter(GeoMesaStats.okForStats).map(_.getLocalName)
}

val clas = ct.runtimeClass
Expand All @@ -115,6 +116,8 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
readStat[CountStat](sft, countKey()).toSeq
} else if (clas == classOf[MinMax[_]]) {
toRetrieve.flatMap(a => readStat[MinMax[Any]](sft, minMaxKey(a)))
} else if (clas == classOf[TopK[_]]) {
toRetrieve.flatMap(a => readStat[TopK[Any]](sft, topKKey(a)))
} else if (clas == classOf[Histogram[_]]) {
toRetrieve.flatMap(a => readStat[Histogram[Any]](sft, histogramKey(a)))
} else if (clas == classOf[Frequency[_]]) {
Expand Down Expand Up @@ -219,31 +222,31 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
*/
private [stats] def writeStat(stat: Stat, sft: SimpleFeatureType, merge: Boolean): Unit = {

def shouldWrite(keyAndStat: (String, Stat)): Boolean = {
if (merge && keyAndStat._2.isInstanceOf[CountStat]) {
def shouldWrite(ks: KeyAndStat): Boolean = {
if (merge && ks.stat.isInstanceOf[CountStat]) {
// count stat is additive so we don't want to compare to the current value
true
} else {
// only re-write if it's changed - writes and compactions are expensive
!readStat[Stat](sft, keyAndStat._1, cache = false).exists(_.isEquivalent(keyAndStat._2))
!readStat[Stat](sft, ks.key, cache = false).exists(_.isEquivalent(ks.stat))
}
}

val toWrite = getKeysAndStatsForWrite(stat, sft).filter(shouldWrite)

if (merge) {
toWrite.foreach { case (k, s) =>
metadata.insert(sft.getTypeName, k, s)
toWrite.foreach { ks =>
metadata.insert(sft.getTypeName, ks.key, ks.stat)
// re-load it so that the combiner takes effect
readStat[Stat](sft, k, cache = false)
readStat[Stat](sft, ks.key, cache = false)
}
} else {
// due to accumulo issues with combiners, deletes and compactions, we have to:
// 1) delete the existing data; 2) compact the table; 3) insert the new value
// see: https://issues.apache.org/jira/browse/ACCUMULO-2232
toWrite.foreach { case (k, _) => metadata.remove(sft.getTypeName, k) }
toWrite.foreach(ks => metadata.remove(sft.getTypeName, ks.key))
compact()
toWrite.foreach { case (k, s) => metadata.insert(sft.getTypeName, k, s) }
toWrite.foreach(ks => metadata.insert(sft.getTypeName, ks.key, ks.stat))
}
}

Expand All @@ -254,31 +257,32 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
* @param sft simple feature type
* @return metadata keys and split stats
*/
private def getKeysAndStatsForWrite(stat: Stat, sft: SimpleFeatureType): Seq[(String, Stat)] = {
private def getKeysAndStatsForWrite(stat: Stat, sft: SimpleFeatureType): Seq[KeyAndStat] = {
def name(i: Int) = sft.getDescriptor(i).getLocalName

stat match {
case s: SeqStat => s.stats.flatMap(getKeysAndStatsForWrite(_, sft))
case s: CountStat => Seq((countKey(), s))
case s: MinMax[_] => Seq((minMaxKey(name(s.attribute)), s))
case s: Histogram[_] => Seq((histogramKey(name(s.attribute)), s))
case s: CountStat => Seq(KeyAndStat(countKey(), s))
case s: MinMax[_] => Seq(KeyAndStat(minMaxKey(name(s.attribute)), s))
case s: TopK[_] => Seq(KeyAndStat(topKKey(name(s.attribute)), s))
case s: Histogram[_] => Seq(KeyAndStat(histogramKey(name(s.attribute)), s))

case s: Frequency[_] =>
val attribute = name(s.attribute)
if (s.dtgIndex == -1) {
Seq((frequencyKey(attribute), s))
Seq(KeyAndStat(frequencyKey(attribute), s))
} else {
// split up the frequency and store by week
s.splitByWeek.map { case (w, f) => (frequencyKey(attribute, w), f) }
s.splitByWeek.map { case (w, f) => KeyAndStat(frequencyKey(attribute, w), f) }
}

case s: Z3Histogram =>
val geom = name(s.geomIndex)
val dtg = name(s.dtgIndex)
// split up the z3 histogram and store by week
s.splitByWeek.map { case (w, z) => (histogramKey(geom, dtg, w), z) }
s.splitByWeek.map { case (w, z) => KeyAndStat(histogramKey(geom, dtg, w), z) }

case _ => throw new NotImplementedError("Only Count, Frequency, MinMax and Histogram stats are tracked")
case _ => throw new NotImplementedError("Only Count, Frequency, MinMax, TopK and Histogram stats are tracked")
}
}

Expand Down Expand Up @@ -320,27 +324,38 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
*/
private def buildStatsFor(sft: SimpleFeatureType): String = {
import GeoMesaStats._
import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor

val attributes = statAttributesFor(sft).map(a => (a, sft.getDescriptor(a).getType.getBinding))
// get the attributes that we will keep stats for
val stAttributes = Option(sft.getGeomField).toSeq ++ sft.getDtgField
val indexedAttributes = sft.getAttributeDescriptors.filter(d => d.isIndexed && okForStats(d)).map(_.getLocalName)
val flaggedAttributes = sft.getAttributeDescriptors.filter(d => d.isKeepStats && okForStats(d)).map(_.getLocalName)

val count = Stat.Count()
val minMax = attributes.map(a => Stat.MinMax(a._1))

// calculate min/max for all attributes
val minMax = (stAttributes ++ indexedAttributes ++ flaggedAttributes).distinct.map(Stat.MinMax)

// calculate topk for indexed attributes, but not geom + date
val topK = (indexedAttributes ++ flaggedAttributes).distinct.map(Stat.TopK)

// calculate frequencies only for indexed attributes
val frequencies = {
import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor
val indexed = attributes.filter { case (a, _) => sft.getDescriptor(a).isIndexed }
val descriptors = indexedAttributes.map(sft.getDescriptor)
// calculate one frequency that's split by week, and one that isn't
// for queries with time bounds, the split by week will be more accurate
// for queries without time bounds, we save the overhead of merging the weekly splits
val withDates = sft.getDtgField match {
case None => Seq.empty
case Some(dtg) => indexed.map { case (a, b) => Stat.Frequency(a, dtg, defaultPrecision(b)) }
case Some(dtg) => descriptors.map(d => Stat.Frequency(d.getLocalName, dtg, defaultPrecision(d.getType.getBinding)))
}
val noDates = indexed.map { case (a, b) => Stat.Frequency(a, defaultPrecision(b)) }
val noDates = descriptors.map(d => Stat.Frequency(d.getLocalName, defaultPrecision(d.getType.getBinding)))
withDates ++ noDates
}

val histograms = attributes.map { case (attribute, binding) =>
// calculate histograms for all indexed attributes and geom/date
val histograms = (stAttributes ++ indexedAttributes).distinct.map { attribute =>
val binding = sft.getDescriptor(attribute).getType.getBinding
// calculate the endpoints for the histogram
// the histogram will expand as needed, but this is a starting point
val bounds = {
Expand All @@ -361,13 +376,13 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String)
}

val z3Histogram = for {
geom <- attributes.find(_._1 == sft.getGeomField).map(_._1)
dtg <- sft.getDtgField.filter(attributes.map(_._1).contains)
geom <- Option(sft.getGeomField).filter(stAttributes.contains)
dtg <- sft.getDtgField.filter(stAttributes.contains)
} yield {
Stat.Z3Histogram(geom, dtg, MaxHistogramSize)
}

Stat.SeqStat(Seq(count) ++ minMax ++ histograms ++ frequencies ++ z3Histogram)
Stat.SeqStat(Seq(count) ++ minMax ++ topK ++ histograms ++ frequencies ++ z3Histogram)
}
}

Expand Down Expand Up @@ -423,6 +438,7 @@ object GeoMesaMetadataStats {

private val CountKey = "stats-count"
private val BoundsKeyPrefix = "stats-bounds"
private val TopKKeyPrefix = "stats-topk"
private val FrequencyKeyPrefix = "stats-freq"
private val HistogramKeyPrefix = "stats-hist"

Expand Down Expand Up @@ -470,6 +486,9 @@ object GeoMesaMetadataStats {
// gets the key for storing a min-max
private [stats] def minMaxKey(attribute: String): String = s"$BoundsKeyPrefix-$attribute"

// gets the key for storing a min-max
private [stats] def topKKey(attribute: String): String = s"$TopKKeyPrefix-$attribute"

// gets the key for storing a frequency attribute
private [stats] def frequencyKey(attribute: String): String =
s"$FrequencyKeyPrefix-$attribute"
Expand All @@ -484,4 +503,6 @@ object GeoMesaMetadataStats {
// gets the key for storing a Z3 histogram
private [stats] def histogramKey(geom: String, dtg: String, week: Short): String =
histogramKey(s"$geom-$dtg-$week")

private case class KeyAndStat(key: String, stat: Stat)
}
Expand Up @@ -114,7 +114,6 @@ object GeoMesaStats {
import java.lang.{Double => jDouble, Float => jFloat, Long => jLong}

import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

// date bucket size in milliseconds for the date frequency - one day
val DateFrequencyPrecision = 1000 * 60 * 60 * 24
Expand Down Expand Up @@ -176,13 +175,6 @@ object GeoMesaStats {
// TODO GEOMESA-1217 support list/maps in stats
def okForStats(d: AttributeDescriptor): Boolean =
!d.isMultiValued && StatClasses.exists(_.isAssignableFrom(d.getType.getBinding))

// get the attributes that we will keep stats for
private [stats] def statAttributesFor(sft: SimpleFeatureType): Seq[String] = {
import scala.collection.JavaConversions._
val indexed = sft.getAttributeDescriptors.filter(d => d.isIndexed && okForStats(d)).map(_.getLocalName)
(Option(sft.getGeomField).toSeq ++ sft.getDtgField ++ indexed).distinct
}
}

trait HasGeoMesaStats {
Expand Down

0 comments on commit 2287eb5

Please sign in to comment.