diff --git a/docs/user/commandline_tools.rst b/docs/user/commandline_tools.rst index da1a41a364a0..818ea3d878ff 100644 --- a/docs/user/commandline_tools.rst +++ b/docs/user/commandline_tools.rst @@ -35,7 +35,7 @@ Run ``geomesa`` without any arguments to produce the following usage text:: stats-analyze Analyze statistics on a GeoMesa feature type stats-bounds View or calculate bounds on attributes in a GeoMesa feature type stats-count Estimate or calculate feature counts in a GeoMesa feature type - stats-enumerate Enumerate attribute values in a GeoMesa feature type + stats-top-k Enumerate the most frequent values in a GeoMesa feature type stats-histogram View or calculate counts of attribute in a GeoMesa feature type, grouped by sorted values removeschema Remove a schema and associated features from a GeoMesa catalog tableconf Perform table configuration operations @@ -509,24 +509,27 @@ Example usage:: Count: 182 -stats-enumerate -~~~~~~~~~~~~~~~ +stats-top-k +~~~~~~~~~~~ -Enumerates the values for attributes in your data set. You can enumerate all values, or only values for -features that match a CQL filter. +Enumerates the values for attributes in your data set. You can enumerate all values for all features, +or only values for features that match a CQL filter. Example usage:: - $ geomesa stats-enumerate -u username -p password -i instance -z zoo1,zoo2,zoo3 \ - -c geomesa.data -f twitter -a user_id - Running stat query... - Values for 'user_id': - 3144822634 (26383) - 388009236 (20457) - 497145453 (19514) + $ geomesa stats-top-k -u username -p password -i instance -z zoo1,zoo2,zoo3 \ + -c geomesa.data -f twitter -a user_id -k 10 + Top values for 'user_id': + 3144822634 (26681) + 388009236 (20553) + 497145453 (19858) 563319506 (15848) - 2841269945 (15716) - ... + 2841269945 (15763) + 2924224280 (15731) + 141302910 (15240) + 2587789764 (14811) + 56266341 (14487) + 889599440 (14330) stats-histogram ~~~~~~~~~~~~~~~ diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaMetadataStats.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaMetadataStats.scala index 405d1ca8c915..7d8344ad032e 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaMetadataStats.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaMetadataStats.scala @@ -104,9 +104,10 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) override def getStats[T <: Stat](sft: SimpleFeatureType, attributes: Seq[String], options: Seq[Any])(implicit ct: ClassTag[T]): Seq[T] = { - val toRetrieve = { - val available = GeoMesaStats.statAttributesFor(sft) - if (attributes.isEmpty) available else attributes.filter(available.contains) + val toRetrieve = if (attributes.nonEmpty) { + attributes.filter(a => Option(sft.getDescriptor(a)).exists(GeoMesaStats.okForStats)) + } else { + sft.getAttributeDescriptors.filter(GeoMesaStats.okForStats).map(_.getLocalName) } val clas = ct.runtimeClass @@ -115,6 +116,8 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) readStat[CountStat](sft, countKey()).toSeq } else if (clas == classOf[MinMax[_]]) { toRetrieve.flatMap(a => readStat[MinMax[Any]](sft, minMaxKey(a))) + } else if (clas == classOf[TopK[_]]) { + toRetrieve.flatMap(a => readStat[TopK[Any]](sft, topKKey(a))) } else if (clas == classOf[Histogram[_]]) { toRetrieve.flatMap(a => readStat[Histogram[Any]](sft, histogramKey(a))) } else if (clas == classOf[Frequency[_]]) { @@ -219,31 +222,31 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) */ private [stats] def writeStat(stat: Stat, sft: SimpleFeatureType, merge: Boolean): Unit = { - def shouldWrite(keyAndStat: (String, Stat)): Boolean = { - if (merge && keyAndStat._2.isInstanceOf[CountStat]) { + def shouldWrite(ks: KeyAndStat): Boolean = { + if (merge && ks.stat.isInstanceOf[CountStat]) { // count stat is additive so we don't want to compare to the current value true } else { // only re-write if it's changed - writes and compactions are expensive - !readStat[Stat](sft, keyAndStat._1, cache = false).exists(_.isEquivalent(keyAndStat._2)) + !readStat[Stat](sft, ks.key, cache = false).exists(_.isEquivalent(ks.stat)) } } val toWrite = getKeysAndStatsForWrite(stat, sft).filter(shouldWrite) if (merge) { - toWrite.foreach { case (k, s) => - metadata.insert(sft.getTypeName, k, s) + toWrite.foreach { ks => + metadata.insert(sft.getTypeName, ks.key, ks.stat) // re-load it so that the combiner takes effect - readStat[Stat](sft, k, cache = false) + readStat[Stat](sft, ks.key, cache = false) } } else { // due to accumulo issues with combiners, deletes and compactions, we have to: // 1) delete the existing data; 2) compact the table; 3) insert the new value // see: https://issues.apache.org/jira/browse/ACCUMULO-2232 - toWrite.foreach { case (k, _) => metadata.remove(sft.getTypeName, k) } + toWrite.foreach(ks => metadata.remove(sft.getTypeName, ks.key)) compact() - toWrite.foreach { case (k, s) => metadata.insert(sft.getTypeName, k, s) } + toWrite.foreach(ks => metadata.insert(sft.getTypeName, ks.key, ks.stat)) } } @@ -254,31 +257,32 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) * @param sft simple feature type * @return metadata keys and split stats */ - private def getKeysAndStatsForWrite(stat: Stat, sft: SimpleFeatureType): Seq[(String, Stat)] = { + private def getKeysAndStatsForWrite(stat: Stat, sft: SimpleFeatureType): Seq[KeyAndStat] = { def name(i: Int) = sft.getDescriptor(i).getLocalName stat match { case s: SeqStat => s.stats.flatMap(getKeysAndStatsForWrite(_, sft)) - case s: CountStat => Seq((countKey(), s)) - case s: MinMax[_] => Seq((minMaxKey(name(s.attribute)), s)) - case s: Histogram[_] => Seq((histogramKey(name(s.attribute)), s)) + case s: CountStat => Seq(KeyAndStat(countKey(), s)) + case s: MinMax[_] => Seq(KeyAndStat(minMaxKey(name(s.attribute)), s)) + case s: TopK[_] => Seq(KeyAndStat(topKKey(name(s.attribute)), s)) + case s: Histogram[_] => Seq(KeyAndStat(histogramKey(name(s.attribute)), s)) case s: Frequency[_] => val attribute = name(s.attribute) if (s.dtgIndex == -1) { - Seq((frequencyKey(attribute), s)) + Seq(KeyAndStat(frequencyKey(attribute), s)) } else { // split up the frequency and store by week - s.splitByWeek.map { case (w, f) => (frequencyKey(attribute, w), f) } + s.splitByWeek.map { case (w, f) => KeyAndStat(frequencyKey(attribute, w), f) } } case s: Z3Histogram => val geom = name(s.geomIndex) val dtg = name(s.dtgIndex) // split up the z3 histogram and store by week - s.splitByWeek.map { case (w, z) => (histogramKey(geom, dtg, w), z) } + s.splitByWeek.map { case (w, z) => KeyAndStat(histogramKey(geom, dtg, w), z) } - case _ => throw new NotImplementedError("Only Count, Frequency, MinMax and Histogram stats are tracked") + case _ => throw new NotImplementedError("Only Count, Frequency, MinMax, TopK and Histogram stats are tracked") } } @@ -320,27 +324,38 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) */ private def buildStatsFor(sft: SimpleFeatureType): String = { import GeoMesaStats._ + import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor - val attributes = statAttributesFor(sft).map(a => (a, sft.getDescriptor(a).getType.getBinding)) + // get the attributes that we will keep stats for + val stAttributes = Option(sft.getGeomField).toSeq ++ sft.getDtgField + val indexedAttributes = sft.getAttributeDescriptors.filter(d => d.isIndexed && okForStats(d)).map(_.getLocalName) + val flaggedAttributes = sft.getAttributeDescriptors.filter(d => d.isKeepStats && okForStats(d)).map(_.getLocalName) val count = Stat.Count() - val minMax = attributes.map(a => Stat.MinMax(a._1)) + // calculate min/max for all attributes + val minMax = (stAttributes ++ indexedAttributes ++ flaggedAttributes).distinct.map(Stat.MinMax) + + // calculate topk for indexed attributes, but not geom + date + val topK = (indexedAttributes ++ flaggedAttributes).distinct.map(Stat.TopK) + + // calculate frequencies only for indexed attributes val frequencies = { - import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor - val indexed = attributes.filter { case (a, _) => sft.getDescriptor(a).isIndexed } + val descriptors = indexedAttributes.map(sft.getDescriptor) // calculate one frequency that's split by week, and one that isn't // for queries with time bounds, the split by week will be more accurate // for queries without time bounds, we save the overhead of merging the weekly splits val withDates = sft.getDtgField match { case None => Seq.empty - case Some(dtg) => indexed.map { case (a, b) => Stat.Frequency(a, dtg, defaultPrecision(b)) } + case Some(dtg) => descriptors.map(d => Stat.Frequency(d.getLocalName, dtg, defaultPrecision(d.getType.getBinding))) } - val noDates = indexed.map { case (a, b) => Stat.Frequency(a, defaultPrecision(b)) } + val noDates = descriptors.map(d => Stat.Frequency(d.getLocalName, defaultPrecision(d.getType.getBinding))) withDates ++ noDates } - val histograms = attributes.map { case (attribute, binding) => + // calculate histograms for all indexed attributes and geom/date + val histograms = (stAttributes ++ indexedAttributes).distinct.map { attribute => + val binding = sft.getDescriptor(attribute).getType.getBinding // calculate the endpoints for the histogram // the histogram will expand as needed, but this is a starting point val bounds = { @@ -361,13 +376,13 @@ class GeoMesaMetadataStats(val ds: AccumuloDataStore, statsTable: String) } val z3Histogram = for { - geom <- attributes.find(_._1 == sft.getGeomField).map(_._1) - dtg <- sft.getDtgField.filter(attributes.map(_._1).contains) + geom <- Option(sft.getGeomField).filter(stAttributes.contains) + dtg <- sft.getDtgField.filter(stAttributes.contains) } yield { Stat.Z3Histogram(geom, dtg, MaxHistogramSize) } - Stat.SeqStat(Seq(count) ++ minMax ++ histograms ++ frequencies ++ z3Histogram) + Stat.SeqStat(Seq(count) ++ minMax ++ topK ++ histograms ++ frequencies ++ z3Histogram) } } @@ -423,6 +438,7 @@ object GeoMesaMetadataStats { private val CountKey = "stats-count" private val BoundsKeyPrefix = "stats-bounds" + private val TopKKeyPrefix = "stats-topk" private val FrequencyKeyPrefix = "stats-freq" private val HistogramKeyPrefix = "stats-hist" @@ -470,6 +486,9 @@ object GeoMesaMetadataStats { // gets the key for storing a min-max private [stats] def minMaxKey(attribute: String): String = s"$BoundsKeyPrefix-$attribute" + // gets the key for storing a min-max + private [stats] def topKKey(attribute: String): String = s"$TopKKeyPrefix-$attribute" + // gets the key for storing a frequency attribute private [stats] def frequencyKey(attribute: String): String = s"$FrequencyKeyPrefix-$attribute" @@ -484,4 +503,6 @@ object GeoMesaMetadataStats { // gets the key for storing a Z3 histogram private [stats] def histogramKey(geom: String, dtg: String, week: Short): String = histogramKey(s"$geom-$dtg-$week") + + private case class KeyAndStat(key: String, stat: Stat) } \ No newline at end of file diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaStats.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaStats.scala index c181c1c8ad84..1d1b90bc0347 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaStats.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/stats/GeoMesaStats.scala @@ -114,7 +114,6 @@ object GeoMesaStats { import java.lang.{Double => jDouble, Float => jFloat, Long => jLong} import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor - import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType // date bucket size in milliseconds for the date frequency - one day val DateFrequencyPrecision = 1000 * 60 * 60 * 24 @@ -176,13 +175,6 @@ object GeoMesaStats { // TODO GEOMESA-1217 support list/maps in stats def okForStats(d: AttributeDescriptor): Boolean = !d.isMultiValued && StatClasses.exists(_.isAssignableFrom(d.getType.getBinding)) - - // get the attributes that we will keep stats for - private [stats] def statAttributesFor(sft: SimpleFeatureType): Seq[String] = { - import scala.collection.JavaConversions._ - val indexed = sft.getAttributeDescriptors.filter(d => d.isIndexed && okForStats(d)).map(_.getLocalName) - (Option(sft.getGeomField).toSeq ++ sft.getDtgField ++ indexed).distinct - } } trait HasGeoMesaStats { diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreStatsTest.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreStatsTest.scala index c349acda5275..727323b152d3 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreStatsTest.scala +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/test/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreStatsTest.scala @@ -34,12 +34,13 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { sequential - val spec = "name:String:index=true,dtg:Date,*geom:Point:srid=4326" + // note: attributes that are not indexed but still collect stats only store bounds and topK + val spec = "name:String:index=true,age:Int:keep-stats=true,height:Int,dtg:Date,*geom:Point:srid=4326" val baseMillis = { val sf = new ScalaSimpleFeature("", sft) - sf.setAttribute(1, "2016-01-04T00:00:00.000Z") - sf.getAttribute(1).asInstanceOf[Date].getTime + sf.setAttribute(3, "2016-01-04T00:00:00.000Z") + sf.getAttribute(3).asInstanceOf[Date].getTime } val dayInMillis = new DateTime(baseMillis, DateTimeZone.UTC).plusDays(1).getMillis - baseMillis @@ -51,7 +52,18 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { ds.stats.getCount(sft) must beNone ds.stats.getBounds(sft) mustEqual wholeWorldEnvelope ds.stats.getAttributeBounds[String](sft, "name") must beNone + ds.stats.getAttributeBounds[Int](sft, "age") must beNone ds.stats.getAttributeBounds[Date](sft, "dtg") must beNone + ds.stats.getStats[TopK[String]](sft, Seq("name")) must beEmpty + ds.stats.getStats[TopK[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[TopK[Date]](sft, Seq("dtg")) must beEmpty + ds.stats.getStats[Frequency[String]](sft, Seq("name")) must beEmpty + ds.stats.getStats[Frequency[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Frequency[Date]](sft, Seq("dtg")) must beEmpty + ds.stats.getStats[Histogram[String]](sft, Seq("name")) must beEmpty + ds.stats.getStats[Histogram[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Histogram[Date]](sft, Seq("dtg")) must beEmpty + ds.stats.getStats[Histogram[Geometry]](sft, Seq("geom")) must beEmpty } "through feature writer append" >> { @@ -59,20 +71,42 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { val sf = writer.next() sf.setAttribute(0, "alpha") - sf.setAttribute(1, "2016-01-04T00:00:00.000Z") - sf.setAttribute(2, "POINT (0 0)") + sf.setAttribute(1, 10) + sf.setAttribute(2, 10) + sf.setAttribute(3, "2016-01-04T00:00:00.000Z") + sf.setAttribute(4, "POINT (0 0)") writer.write() writer.flush() ds.stats.getCount(sft) must beSome(1) + ds.stats.getBounds(sft) mustEqual new ReferencedEnvelope(0, 0, 0, 0, CRS_EPSG_4326) ds.stats.getAttributeBounds[String](sft, "name") must beSome(AttributeBounds("alpha", "alpha", 1)) + ds.stats.getAttributeBounds[Int](sft, "age") must beSome(AttributeBounds(10, 10, 1)) + ds.stats.getAttributeBounds[String](sft, "height") must beNone ds.stats.getAttributeBounds[Date](sft, "dtg") must beSome(AttributeBounds(new Date(baseMillis), new Date(baseMillis), 1)) + ds.stats.getStats[TopK[String]](sft, Seq("name")).map(_.topK(10)) mustEqual Seq(Seq(("alpha", 1))) + ds.stats.getStats[TopK[Int]](sft, Seq("age")).map(_.topK(10)) mustEqual Seq(Seq((10, 1))) + ds.stats.getStats[TopK[Int]](sft, Seq("height")) must beEmpty + ds.stats.getStats[TopK[Date]](sft, Seq("dtg")).map(_.topK(10)) must beEmpty + + ds.stats.getStats[Frequency[String]](sft, Seq("name")) must haveLength(1) + ds.stats.getStats[Frequency[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Frequency[Int]](sft, Seq("height")) must beEmpty + ds.stats.getStats[Frequency[Date]](sft, Seq("dtg")) must beEmpty + + ds.stats.getStats[Histogram[String]](sft, Seq("name")) must haveLength(1) + ds.stats.getStats[Histogram[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Histogram[Date]](sft, Seq("dtg")) must haveLength(1) + ds.stats.getStats[Histogram[Geometry]](sft, Seq("geom")) must haveLength(1) + val sf2 = writer.next() sf2.setAttribute(0, "cappa") - sf2.setAttribute(1, "2016-01-04T12:00:00.000Z") - sf2.setAttribute(2, "POINT (10 10)") + sf2.setAttribute(1, 12) + sf2.setAttribute(2, 12) + sf2.setAttribute(3, "2016-01-04T12:00:00.000Z") + sf2.setAttribute(4, "POINT (10 10)") writer.write() writer.close() @@ -88,8 +122,10 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { val sf = new ScalaSimpleFeature("collection1", sft) sf.setAttribute(0, "gamma") - sf.setAttribute(1, "2016-01-05T00:00:00.000Z") - sf.setAttribute(2, "POINT (-10 -10)") + sf.setAttribute(1, Int.box(15)) + sf.setAttribute(2, Int.box(15)) + sf.setAttribute(3, "2016-01-05T00:00:00.000Z") + sf.setAttribute(4, "POINT (-10 -10)") val features = new DefaultFeatureCollection() features.add(sf) @@ -107,8 +143,10 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { val sf = writer.next() sf.setAttribute(0, "beta") - sf.setAttribute(1, "2016-01-04T00:00:00.000Z") - sf.setAttribute(2, "POINT (0 0)") + sf.setAttribute(1, 11) + sf.setAttribute(2, 11) + sf.setAttribute(3, "2016-01-04T00:00:00.000Z") + sf.setAttribute(4, "POINT (0 0)") writer.write() writer.close() @@ -124,8 +162,10 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { val sf = new ScalaSimpleFeature("", sft) sf.setAttribute(0, "0") - sf.setAttribute(1, "2016-01-03T00:00:00.000Z") - sf.setAttribute(2, "POINT (15 0)") + sf.setAttribute(1, Int.box(10)) + sf.setAttribute(2, Int.box(10)) + sf.setAttribute(3, "2016-01-03T00:00:00.000Z") + sf.setAttribute(4, "POINT (15 0)") val features = new SimpleFeatureReader() { val iter = Iterator(sf) @@ -145,13 +185,26 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { } "update all stats" >> { + val deleter = ds.getFeatureWriter(sftName, Transaction.AUTO_COMMIT) + while (deleter.hasNext) { + deleter.next() + deleter.remove() + } + deleter.close() + val writer = ds.getFeatureWriterAppend(sftName, Transaction.AUTO_COMMIT) (0 until 10).foreach { i => val sf = writer.next() sf.setAttribute(0, s"$i") - sf.setAttribute(1, f"2016-01-${i + 1}%02dT00:00:00.000Z") - sf.setAttribute(2, s"POINT (${i * 3} $i)") + if (i < 3) { + sf.setAttribute(1, Int.box(1)) + } else { + sf.setAttribute(1, Int.box(2)) + } + sf.setAttribute(2, Int.box(i)) + sf.setAttribute(3, f"2016-01-${i + 1}%02dT00:00:00.000Z") + sf.setAttribute(4, s"POINT (${i * 3} $i)") writer.write() } @@ -168,23 +221,40 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { // run it twice so that all our bounds are exact for histograms ds.stats.generateStats(sft) - ds.stats.getCount(sft) must beSome(11) + ds.stats.getCount(sft) must beSome(10) ds.stats.getBounds(sft) mustEqual new ReferencedEnvelope(0, 27, 0, 9, CRS_EPSG_4326) ds.stats.getAttributeBounds[String](sft, "name") must beSome(AttributeBounds("0", "9", 10)) + ds.stats.getAttributeBounds[Int](sft, "age") must beSome(AttributeBounds(1, 2, 2)) + ds.stats.getAttributeBounds[Int](sft, "height") must beNone ds.stats.getAttributeBounds[Date](sft, "dtg") must beSome(AttributeBounds(minDate, maxDate, 10)) + val nameTopK = ds.stats.getStats[TopK[String]](sft, Seq("name")) + nameTopK must haveLength(1) + nameTopK.head.topK(10) must containTheSameElementsAs((0 until 10).map(i => (s"$i", 1))) + + val ageTopK = ds.stats.getStats[TopK[Int]](sft, Seq("age")) + ageTopK must haveLength(1) + ageTopK.head.topK(10) mustEqual Seq((2, 7), (1, 3)) + + ds.stats.getStats[TopK[Int]](sft, Seq("height")) must beEmpty + ds.stats.getStats[TopK[Date]](sft, Seq("dtg")) must beEmpty + + val nameFrequency = ds.stats.getStats[Frequency[String]](sft, Seq("name")) + nameFrequency must haveLength(1) + forall(0 until 10)(i => nameFrequency.head.count(i.toString) mustEqual 1) + + ds.stats.getStats[Frequency[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Frequency[Int]](sft, Seq("height")) must beEmpty + val nameHistogram = ds.stats.getStats[Histogram[String]](sft, Seq("name")) nameHistogram must haveLength(1) nameHistogram.head.bounds mustEqual ("0", "9") nameHistogram.head.length mustEqual 1000 - nameHistogram.head.count(nameHistogram.head.indexOf("0")) mustEqual 2 - forall(1 until 10)(i => nameHistogram.head.count(nameHistogram.head.indexOf(i.toString)) mustEqual 1) - (0 until 1000).map(nameHistogram.head.count).sum mustEqual 11 + forall(0 until 10)(i => nameHistogram.head.count(nameHistogram.head.indexOf(i.toString)) mustEqual 1) + (0 until 1000).map(nameHistogram.head.count).sum mustEqual 10 - val nameFrequency = ds.stats.getStats[Frequency[String]](sft, Seq("name")) - nameFrequency must haveLength(1) - nameFrequency.head.count("0") mustEqual 2 - forall(1 until 10)(i => nameFrequency.head.count(i.toString) mustEqual 1) + ds.stats.getStats[Histogram[Int]](sft, Seq("age")) must beEmpty + ds.stats.getStats[Histogram[Int]](sft, Seq("height")) must beEmpty val dateHistogram = ds.stats.getStats[Histogram[Date]](sft, Seq("dtg")) dateHistogram must haveLength(1) @@ -192,7 +262,7 @@ class AccumuloDataStoreStatsTest extends Specification with TestWithDataStore { dateHistogram.head.length mustEqual 1000 dateHistogram.head.count(0) mustEqual 1 dateHistogram.head.count(999) mustEqual 1 - (0 until 1000).map(dateHistogram.head.count).sum mustEqual 11 + (0 until 1000).map(dateHistogram.head.count).sum mustEqual 10 val geomHistogram = ds.stats.getStats[Histogram[Geometry]](sft, Seq("geom")) geomHistogram must haveLength(1) diff --git a/geomesa-tools/README.md b/geomesa-tools/README.md index 25cca8e2f8ff..44494eeb11e3 100644 --- a/geomesa-tools/README.md +++ b/geomesa-tools/README.md @@ -56,7 +56,7 @@ This should print out the following usage text: stats-analyze Analyze statistics on a GeoMesa feature type stats-bounds View or calculate bounds on attributes in a GeoMesa feature type stats-count Estimate or calculate feature counts in a GeoMesa feature type - stats-enumerate Enumerate attribute values in a GeoMesa feature type + stats-top-k Enumerate the most frequent values in a GeoMesa feature type stats-histogram View or calculate counts of attribute in a GeoMesa feature type, grouped by sorted values tableconf Perform table configuration operations version GeoMesa Version @@ -374,12 +374,12 @@ Use the `stats-count` command to count features in your data set. Running stat query... Count: 182 -### stats-enumerate +### stats-top-k #### Usage - $ geomesa help stats-enumerate - Enumerate attribute values in a GeoMesa feature type - Usage: stats-enumerate [options] + $ geomesa help stats-top-k + Enumerate the most frequent values in a GeoMesa feature type + Usage: stats-top-k [options] Options: -a, --attributes Attributes to evaluate (use multiple flags or separate with commas) @@ -388,6 +388,8 @@ Use the `stats-count` command to count features in your data set. Accumulo authorizations * -c, --catalog Catalog table name for GeoMesa + -k + Number of top values to show -q, --cql CQL predicate * -f, --feature-name @@ -407,10 +409,10 @@ Use the `stats-count` command to count features in your data set. Zookeepers (host[:port], comma separated) #### Example: - $ geomesa stats-enumerate -u username -p password -i instance -z zoo1,zoo2,zoo3 -c geomesa.data \ - -f twitter -a user_id + $ geomesa stats-top-k -u username -p password -i instance -z zoo1,zoo2,zoo3 -c geomesa.data \ + -f twitter -a user_id --no-cache Running stat query... - Values for 'user_id': + Top values for 'user_id': 3144822634 (26383) 388009236 (20457) 497145453 (19514) diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/AccumuloRunner.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/AccumuloRunner.scala index ce3e127aac6b..c13951ea13e2 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/AccumuloRunner.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/AccumuloRunner.scala @@ -37,7 +37,7 @@ object AccumuloRunner extends Runner { new StatsAnalyzeCommand(jc), new StatsBoundsCommand(jc), new StatsCountCommand(jc), - new StatsEnumerateCommand(jc), + new StatsTopKCommand(jc), new StatsHistogramCommand(jc) ) } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsAnalyzeCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsAnalyzeCommand.scala index c5e958c1efd9..0cc0495ddabf 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsAnalyzeCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsAnalyzeCommand.scala @@ -65,7 +65,7 @@ class StatsAnalyzeCommand(parent: JCommander) extends CommandWithCatalog(parent) logger.info("Stats analyzed:") println(strings.mkString(" ", "\n ", "")) - logger.info("Use 'stats-histogram' or 'stats-count' commands for more details") + logger.info("Use 'stats-histogram', 'stats-top-k' or 'stats-count' commands for more details") } } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsBoundsCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsBoundsCommand.scala index b3799cc6e91f..93ab2490abed 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsBoundsCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsBoundsCommand.scala @@ -60,4 +60,4 @@ class StatsBoundsCommand(parent: JCommander) extends CommandWithCatalog(parent) } @Parameters(commandDescription = "View or calculate bounds on attributes in a GeoMesa feature type") -class StatsBoundsParameters extends StatsParams with CachedStatsParams with AttributeStatsParams \ No newline at end of file +class StatsBoundsParameters extends StatsParams with AttributeStatsParams \ No newline at end of file diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCommand.scala index b979d3856a5e..4293d73b8bc4 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCommand.scala @@ -38,9 +38,7 @@ object StatsCommand { } } -trait StatsParams extends GeoMesaConnectionParams with FeatureTypeNameParam with OptionalCQLFilterParam - -trait CachedStatsParams { +trait StatsParams extends GeoMesaConnectionParams with FeatureTypeNameParam with OptionalCQLFilterParam { @Parameter(names = Array("--no-cache"), description = "Calculate against the data set instead of using cached statistics (may be slow)") var exact: Boolean = false } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCountCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCountCommand.scala index 5221ef6fc4de..ad207ce14655 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCountCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsCountCommand.scala @@ -35,4 +35,4 @@ class StatsCountCommand(parent: JCommander) extends CommandWithCatalog(parent) w } @Parameters(commandDescription = "Estimate or calculate feature counts in a GeoMesa feature type") -class StatsCountParameters extends StatsParams with CachedStatsParams \ No newline at end of file +class StatsCountParameters extends StatsParams \ No newline at end of file diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsEnumerateCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsEnumerateCommand.scala deleted file mode 100644 index 99e7cad7ba92..000000000000 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsEnumerateCommand.scala +++ /dev/null @@ -1,72 +0,0 @@ -/*********************************************************************** -* Copyright (c) 2013-2016 Commonwealth Computer Research, Inc. -* All rights reserved. This program and the accompanying materials -* are made available under the terms of the Apache License, Version 2.0 -* which accompanies this distribution and is available at -* http://www.opensource.org/licenses/apache2.0.php. -*************************************************************************/ - -package org.locationtech.geomesa.tools.accumulo.commands.stats - -import com.beust.jcommander.{JCommander, Parameters} -import com.typesafe.scalalogging.LazyLogging -import org.geotools.filter.text.ecql.ECQL -import org.locationtech.geomesa.tools.accumulo.commands.CommandWithCatalog -import org.locationtech.geomesa.utils.stats.{EnumerationStat, Stat} -import org.opengis.filter.Filter - -import scala.math.Ordering - -class StatsEnumerateCommand(parent: JCommander) extends CommandWithCatalog(parent) with LazyLogging { - - override val command = "stats-enumerate" - override val params = new StatsEnumerateParameters - - override def execute() = { - val sft = ds.getSchema(params.featureName) - val attributes = StatsCommand.getAttributes(sft, params) - val filter = Option(params.cqlFilter).map(ECQL.toFilter).getOrElse(Filter.INCLUDE) - - logger.info("Running stat query...") - - val query = Stat.SeqStat(attributes.map(Stat.Enumeration)) - val enumerations = ds.stats.runStats[EnumerationStat[Any]](sft, query, filter) - - attributes.foreach { attribute => - println(s"Values for '$attribute':") - val enumeration = enumerations.find(_.attribute == sft.indexOf(attribute)) - enumeration match { - case None => println(" unavailable") - case Some(e) => - val binding = sft.getDescriptor(attribute).getType.getBinding - val ordering = if (classOf[Comparable[Any]].isAssignableFrom(binding)) { - new Ordering[Tuple2[Any, Long]] { - override def compare(x: (Any, Long), y: (Any, Long)): Int = { - // swap positions to get reverse sorting with large counts first - val compareCount = y._2.compareTo(x._2) - if (compareCount != 0) { - compareCount - } else { - x._1.asInstanceOf[Comparable[Any]].compareTo(y._1) - } - } - } - } else { - new Ordering[Tuple2[Any, Long]] { - override def compare(x: (Any, Long), y: (Any, Long)): Int = { - // swap positions to get reverse sorting with large counts first - y._2.compareTo(x._2) - } - } - } - val stringify = Stat.stringifier(binding) - e.frequencies.toSeq.sorted(ordering).foreach { case (value, count) => - println(s" ${stringify(value)} ($count)") - } - } - } - } -} - -@Parameters(commandDescription = "Enumerate attribute values in a GeoMesa feature type") -class StatsEnumerateParameters extends StatsParams with AttributeStatsParams diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsHistogramCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsHistogramCommand.scala index 1d7eca66906c..8d7588297828 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsHistogramCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsHistogramCommand.scala @@ -140,7 +140,7 @@ class StatsHistogramCommand(parent: JCommander) extends CommandWithCatalog(paren object StatsHistogramCommand { @Parameters(commandDescription = "View or calculate counts of attribute in a GeoMesa feature type, grouped by sorted values") - class StatsHistogramParams extends StatsParams with CachedStatsParams with AttributeStatsParams { + class StatsHistogramParams extends StatsParams with AttributeStatsParams { @Parameter(names = Array("-b", "--bins"), description = "How many bins the data will be divided into. " + "For example, if you are examining a week of data, you may want to divide the date into 7 bins, one per day.") var bins: Integer = null diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsTopKCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsTopKCommand.scala new file mode 100644 index 000000000000..fc43a6234d18 --- /dev/null +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/accumulo/commands/stats/StatsTopKCommand.scala @@ -0,0 +1,87 @@ +/*********************************************************************** +* Copyright (c) 2013-2016 Commonwealth Computer Research, Inc. +* All rights reserved. This program and the accompanying materials +* are made available under the terms of the Apache License, Version 2.0 +* which accompanies this distribution and is available at +* http://www.opensource.org/licenses/apache2.0.php. +*************************************************************************/ + +package org.locationtech.geomesa.tools.accumulo.commands.stats + +import com.beust.jcommander.{JCommander, Parameter, Parameters} +import com.typesafe.scalalogging.LazyLogging +import org.geotools.filter.text.ecql.ECQL +import org.locationtech.geomesa.tools.accumulo.commands.CommandWithCatalog +import org.locationtech.geomesa.utils.stats.{EnumerationStat, Stat, TopK} +import org.opengis.filter.Filter + +import scala.math.Ordering + +class StatsTopKCommand(parent: JCommander) extends CommandWithCatalog(parent) with LazyLogging { + + override val command = "stats-top-k" + override val params = new StatsTopKParameters + + override def execute() = { + val sft = ds.getSchema(params.featureName) + val attributes = StatsCommand.getAttributes(sft, params) + val filter = Option(params.cqlFilter).map(ECQL.toFilter).getOrElse(Filter.INCLUDE) + val k = Option(params.k).map(_.intValue) + + val results = if (params.exact) { + logger.info("Running stat query...") + val query = Stat.SeqStat(attributes.map(Stat.TopK)) + ds.stats.runStats[TopK[Any]](sft, query, filter) + } else { + ds.stats.getStats[TopK[Any]](sft, attributes) + } + + attributes.foreach { attribute => + println(s"Top values for '$attribute':") + val stat = results.find(_.attribute == sft.indexOf(attribute)) + stat match { + case None => println(" unavailable") + case Some(s) => + val binding = sft.getDescriptor(attribute).getType.getBinding + val ordering = StatsTopKCommand.ordering(binding) + val stringify = Stat.stringifier(binding) + s.topK(k.getOrElse(s.size)).sorted(ordering).foreach { case (value, count) => + println(s" ${stringify(value)} ($count)") + } + } + } + } +} + +@Parameters(commandDescription = "Enumerate the most frequent values in a GeoMesa feature type") +class StatsTopKParameters extends StatsParams with AttributeStatsParams { + @Parameter(names = Array("-k"), description = "Number of top values to show") + var k: Integer = null +} + +object StatsTopKCommand { + + def ordering(binding: Class[_]): Ordering[Tuple2[Any, Long]] = { + if (classOf[Comparable[Any]].isAssignableFrom(binding)) { + new Ordering[Tuple2[Any, Long]] { + override def compare(x: (Any, Long), y: (Any, Long)): Int = { + // swap positions to get reverse sorting with large counts first + val compareCount = y._2.compareTo(x._2) + if (compareCount != 0) { + compareCount + } else { + x._1.asInstanceOf[Comparable[Any]].compareTo(y._1) + } + } + } + } else { + new Ordering[Tuple2[Any, Long]] { + override def compare(x: (Any, Long), y: (Any, Long)): Int = { + // swap positions to get reverse sorting with large counts first + y._2.compareTo(x._2) + } + } + } + } + +} \ No newline at end of file diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala index 5759f915e177..a2dd44d74e58 100644 --- a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/Conversions.scala @@ -122,6 +122,13 @@ object RichAttributeDescriptors { Option(ad.getUserData.get(OPT_INDEX).asInstanceOf[String]) .flatMap(c => Try(IndexCoverage.withName(c)).toOption).getOrElse(IndexCoverage.NONE) + def setKeepStats(enabled: Boolean): Unit = if (enabled) { + ad.getUserData.put(OPT_STATS, "true") + } else { + ad.getUserData.remove(OPT_STATS) + } + def isKeepStats(): Boolean = Option(ad.getUserData.get(OPT_STATS)).exists(_ == "true") + def isIndexValue(): Boolean = Option(ad.getUserData.get(OPT_INDEX_VALUE)).contains("true") def setCardinality(cardinality: Cardinality): Unit = diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala index d3cf0fb2ed4c..844a5e7d1f3b 100644 --- a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypes.scala @@ -41,10 +41,11 @@ object SimpleFeatureTypes { val OPT_SRID = "srid" val OPT_INDEX_VALUE = "index-value" val OPT_INDEX = "index" + val OPT_STATS = "keep-stats" val OPT_CARDINALITY = "cardinality" val OPT_BIN_TRACK_ID = "bin-track-id" - val OPTS = Seq(OPT_DEFAULT, OPT_SRID, OPT_INDEX, OPT_INDEX_VALUE, OPT_CARDINALITY, OPT_BIN_TRACK_ID) + val OPTS = Seq(OPT_DEFAULT, OPT_SRID, OPT_INDEX, OPT_STATS, OPT_INDEX_VALUE, OPT_CARDINALITY, OPT_BIN_TRACK_ID) val USER_DATA_LIST_TYPE = "subtype" val USER_DATA_MAP_KEY_TYPE = "keyclass" @@ -251,6 +252,9 @@ object SimpleFeatureTypes { if (ad.isBinTrackId) { options.put(OPT_BIN_TRACK_ID, "true") } + if (ad.isKeepStats) { + options.put(OPT_STATS, "true") + } ad.getType match { case t if simpleTypeMap.contains(t.getBinding.getSimpleName) => SimpleAttributeSpec(ad.getLocalName, ad.getType.getBinding, options.toMap) @@ -320,6 +324,7 @@ object SimpleFeatureTypes { object AttributeSpec { val defaults = Map[String, AnyRef]( OPT_INDEX -> IndexCoverage.NONE.toString, + OPT_STATS -> java.lang.Boolean.FALSE, OPT_INDEX_VALUE -> java.lang.Boolean.FALSE, OPT_CARDINALITY -> Cardinality.UNKNOWN.toString, OPT_SRID -> Integer.valueOf(4326), @@ -346,6 +351,9 @@ object SimpleFeatureTypes { case Cardinality.LOW => options.put(OPT_CARDINALITY, Cardinality.LOW.toString) case _ => // nothing } + if (conf.getBoolean(OPT_STATS)) { + options.put(OPT_STATS, "true") + } if (conf.getBoolean(OPT_INDEX_VALUE)) { options.put(OPT_INDEX_VALUE, "true") } diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/MinMax.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/MinMax.scala index 609677677df6..4541a6f6669a 100644 --- a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/MinMax.scala +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/MinMax.scala @@ -48,7 +48,6 @@ class MinMax[T] private (val attribute: Int, private [stats] var hpp: HyperLogLo lazy val stringify = Stat.stringifier(ct.runtimeClass) private lazy val jsonStringify = Stat.stringifier(ct.runtimeClass, json = true) - def min: T = if (isEmpty) maxValue else minValue def max: T = if (isEmpty) minValue else maxValue def bounds: (T, T) = (min, max) diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/Stat.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/Stat.scala index 1b6aec7206e1..70b1dbb5ef83 100644 --- a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/Stat.scala +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/Stat.scala @@ -138,6 +138,14 @@ object Stat { */ def Enumeration(attribute: String): String = s"Enumeration(${safeString(attribute)})" + /** + * String that will be parsed into a TopK stat + * + * @param attribute attribute name to evaluate + * @return + */ + def TopK(attribute: String): String = s"TopK(${safeString(attribute)})" + /** * String that will be parsed into a count min sketch stat * @@ -347,6 +355,15 @@ object Stat { } } + def topKParser: Parser[TopK[_]] = { + "TopK(" ~> argument <~ ")" ^^ { + case attribute => + val attrIndex = getAttrIndex(attribute) + val binding = sft.getDescriptor(attribute).getType.getBinding + new TopK[Any](attrIndex)(ClassTag(binding)) + } + } + def histogramParser: Parser[Histogram[_]] = { "Histogram(" ~> argument ~ "," ~ positiveInt ~ "," ~ argument ~ "," ~ argument <~ ")" ^^ { case attribute ~ "," ~ numBins ~ "," ~ lower ~ "," ~ upper => @@ -417,7 +434,7 @@ object Stat { } def statParser: Parser[Stat] = countParser | minMaxParser | iteratorStackParser | enumerationParser | - histogramParser | frequencyParser | z3HistogramParser | z3FrequencyParser + topKParser | histogramParser | frequencyParser | z3HistogramParser | z3FrequencyParser def statsParser: Parser[Stat] = { rep1sep(statParser, ";") ^^ { diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/StatSerializer.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/StatSerializer.scala index b3addd449da0..2ae74f90fb77 100644 --- a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/StatSerializer.scala +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/StatSerializer.scala @@ -11,6 +11,7 @@ package org.locationtech.geomesa.utils.stats import java.lang.{Double => jDouble, Float => jFloat, Long => jLong} import java.util.Date +import com.clearspring.analytics.stream.StreamSummary import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.clearspring.analytics.stream.frequency.RichCountMinSketch import com.esotericsoftware.kryo.io.{Input, Output} @@ -73,12 +74,14 @@ object KryoStatSerializer { private [stats] val FrequencyByte: Byte = 6 private [stats] val Z3HistogramByte: Byte = 7 private [stats] val Z3FrequencyByte: Byte = 8 + private [stats] val TopKByte: Byte = 9 private [stats] def write(output: Output, sft: SimpleFeatureType, stat: Stat): Unit = { stat match { case s: CountStat => output.writeByte(CountByte); writeCount(output, s) case s: MinMax[_] => output.writeByte(MinMaxByte); writeMinMax(output, sft, s) - case s: EnumerationStat[_] => output.writeByte(EnumerationByte); writeEnumeration(output, sft, s) + case s: EnumerationStat[_] => output.writeByte(EnumerationByte); writeEnumeration(output, sft, s) + case s: TopK[_] => output.writeByte(TopKByte); writeTopK(output, sft, s) case s: Histogram[_] => output.writeByte(HistogramByte); writeHistogram(output, sft, s) case s: Frequency[_] => output.writeByte(FrequencyByte); writeFrequency(output, sft, s) case s: Z3Histogram => output.writeByte(Z3HistogramByte); writeZ3Histogram(output, sft, s) @@ -93,6 +96,7 @@ object KryoStatSerializer { case CountByte => readCount(input, immutable) case MinMaxByte => readMinMax(input, sft, immutable) case EnumerationByte => readEnumeration(input, sft, immutable) + case TopKByte => readTopK(input, sft, immutable) case HistogramByte => readHistogram(input, sft, immutable) case FrequencyByte => readFrequency(input, sft, immutable) case Z3HistogramByte => readZ3Histogram(input, sft, immutable) @@ -190,6 +194,31 @@ object KryoStatSerializer { stat } + private [stats] def writeTopK(output: Output, sft: SimpleFeatureType, stat: TopK[_]): Unit = { + output.writeInt(stat.attribute, true) + val summary = stat.summary.toBytes + output.writeInt(summary.length, true) + output.write(summary) + } + + private [stats] def readTopK(input: Input, sft: SimpleFeatureType, immutable: Boolean): TopK[_] = { + val attribute = input.readInt(true) + val summary = { + val summaryBytes = Array.ofDim[Byte](input.readInt(true)) + input.read(summaryBytes) + new StreamSummary[Any](summaryBytes) + } + + val binding = sft.getDescriptor(attribute).getType.getBinding + val classTag = ClassTag[Any](binding) + + if (immutable) { + new TopK[Any](attribute, summary)(classTag) with ImmutableStat + } else { + new TopK[Any](attribute, summary)(classTag) + } + } + private [stats] def writeHistogram(output: Output, sft: SimpleFeatureType, stat: Histogram[_]): Unit = { output.writeInt(stat.attribute, true) output.writeInt(stat.length, true) diff --git a/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/TopK.scala b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/TopK.scala new file mode 100644 index 000000000000..a8980212df39 --- /dev/null +++ b/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/stats/TopK.scala @@ -0,0 +1,89 @@ +/*********************************************************************** +* Copyright (c) 2013-2016 Commonwealth Computer Research, Inc. +* All rights reserved. This program and the accompanying materials +* are made available under the terms of the Apache License, Version 2.0 +* which accompanies this distribution and is available at +* http://www.opensource.org/licenses/apache2.0.php. +*************************************************************************/ + +package org.locationtech.geomesa.utils.stats +import com.clearspring.analytics.stream.StreamSummary +import com.typesafe.scalalogging.LazyLogging +import org.opengis.feature.simple.SimpleFeature + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +/** + * TopK stat + * + * @param attribute index of the attribute to track + * @param summary stream summary object + * @param ct classtag + * @tparam T attribute type binding + */ +class TopK[T](val attribute: Int, + private [stats] var summary: StreamSummary[T] = new StreamSummary[T](TopK.StreamCapacity))(implicit ct: ClassTag[T]) + extends Stat with LazyLogging { + + import TopK.StreamCapacity + + override type S = TopK[T] + + lazy val stringify = Stat.stringifier(ct.runtimeClass) + private lazy val jsonStringify = Stat.stringifier(ct.runtimeClass, json = true) + + def topK(k: Int): Seq[(T, Long)] = summary.topK(k).map(c => (c.getItem, c.getCount)) + def size: Int = summary.size + + override def observe(sf: SimpleFeature): Unit = { + val value = sf.getAttribute(attribute).asInstanceOf[T] + if (value != null) { + summary.offer(value) + } + } + + override def unobserve(sf: SimpleFeature): Unit = { + val value = sf.getAttribute(attribute).asInstanceOf[T] + if (value != null) { + summary.offer(value, -1) + } + } + + override def +(other: TopK[T]): TopK[T] = { + val merged = new TopK(attribute, new StreamSummary[T](StreamCapacity)) + merged += this + merged += other + merged + } + + override def +=(other: TopK[T]): Unit = + other.summary.topK(StreamCapacity).foreach { counter => + if (counter.getCount > Int.MaxValue) { + logger.warn(s"Truncating count greater than Int.MaxValue: ${counter.getCount}") + summary.offer(counter.getItem, Int.MaxValue) + } else { + summary.offer(counter.getItem, counter.getCount.toInt) + } + } + + override def clear(): Unit = summary = new StreamSummary[T](StreamCapacity) + + override def isEmpty: Boolean = summary.size == 0 + + override def toJson: String = + summary.topK(10).zipWithIndex.map { case (c, i) => + s""""$i" : { "value" : ${jsonStringify(c.getItem)}, "count" : ${c.getCount} }""" + }.mkString("{ ", ",", " }") + + override def isEquivalent(other: Stat): Boolean = other match { + case s: TopK[T] if summary.size == s.summary.size => + summary.topK(summary.size).map(c => (c.getItem, c.getCount)) == + s.summary.topK(summary.size).map(c => (c.getItem, c.getCount)) + case _ => false + } +} + +object TopK { + val StreamCapacity = 1000 +} diff --git a/geomesa-utils/src/test/scala/org/locationtech/geomesa/utils/stats/TopKTest.scala b/geomesa-utils/src/test/scala/org/locationtech/geomesa/utils/stats/TopKTest.scala new file mode 100644 index 000000000000..98e165b1a899 --- /dev/null +++ b/geomesa-utils/src/test/scala/org/locationtech/geomesa/utils/stats/TopKTest.scala @@ -0,0 +1,257 @@ +/*********************************************************************** +* Copyright (c) 2013-2016 Commonwealth Computer Research, Inc. +* All rights reserved. This program and the accompanying materials +* are made available under the terms of the Apache License, Version 2.0 +* which accompanies this distribution and is available at +* http://www.opensource.org/licenses/apache2.0.php. +*************************************************************************/ + +package org.locationtech.geomesa.utils.stats + +import java.lang.{Double => jDouble, Long => jLong} +import java.util.Date + +import com.vividsolutions.jts.geom.Geometry +import org.geotools.feature.simple.SimpleFeatureBuilder +import org.junit.runner.RunWith +import org.locationtech.geomesa.utils.geotools.{GeoToolsDateFormat, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.text.WKTUtils +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class TopKTest extends Specification { + + val sft = SimpleFeatureTypes.createType("topk", "name:String,score:Long,height:Double,dtg:Date,*geom:Point:srid=4326") + + val builder = new SimpleFeatureBuilder(sft) + + val features1 = (0 until 100).map { i => + if (i < 10) { + builder.addAll(Array[AnyRef]("name10", "10", "10.0", "2010-01-01T00:00:00.000Z", "POINT(10 0)")) + } else if (i < 15) { + builder.addAll(Array[AnyRef]("name15", "15", "15.0", "2015-01-01T00:00:00.000Z", "POINT(15 0)")) + } else if (i < 30) { + builder.addAll(Array[AnyRef]("name30", "30", "30.0", "2030-01-01T00:00:00.000Z", "POINT(30 0)")) + } else if (i < 50) { + builder.addAll(Array[AnyRef]("name50", "50", "50.0", "2050-01-01T00:00:00.000Z", "POINT(50 0)")) + } else { + builder.addAll(Array[AnyRef]("name100", "100", "100.0", "2100-01-01T00:00:00.000Z", "POINT(100 0)")) + } + builder.buildFeature(i.toString) + } + + val features2 = (0 until 100).map { i => + if (i < 10) { + builder.addAll(Array[AnyRef]("name10-2", "210", "10.2", "2010-01-01T02:00:00.000Z", "POINT(10 2)")) + } else if (i < 15) { + builder.addAll(Array[AnyRef]("name15-2", "215", "15.2", "2015-01-01T02:00:00.000Z", "POINT(15 2)")) + } else if (i < 30) { + builder.addAll(Array[AnyRef]("name30-2", "230", "30.2", "2030-01-01T02:00:00.000Z", "POINT(30 2)")) + } else if (i < 50) { + builder.addAll(Array[AnyRef]("name50-2", "250", "50.2", "2050-01-01T02:00:00.000Z", "POINT(50 2)")) + } else { + builder.addAll(Array[AnyRef]("name100-2", "2100", "100.2", "2100-01-01T02:00:00.000Z", "POINT(100 2)")) + } + builder.buildFeature(i.toString) + } + + def createStat[T](attribute: String): TopK[T] = Stat(sft, s"TopK($attribute)").asInstanceOf[TopK[T]] + + def stringStat = createStat[String]("name") + def longStat = createStat[jLong]("score") + def doubleStat = createStat[jDouble]("height") + def dateStat = createStat[Date]("dtg") + def geomStat = createStat[Geometry]("geom") + + "TopK stat" should { + + "work with strings" >> { + "be empty initially" >> { + val stat = stringStat + stat.isEmpty must beTrue + stat.topK(10) must beEmpty + } + + "correctly calculate values" >> { + val stat = stringStat + features1.foreach(stat.observe) + stat.isEmpty must beFalse + stat.size mustEqual 5 + stat.topK(10) mustEqual Seq(("name100", 50), ("name50", 20), ("name30", 15), ("name10", 10), ("name15", 5)) + } + + "serialize and deserialize" >> { + val stat = stringStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + + "serialize and deserialize empty stats" >> { + val stat = stringStat + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + + "deserialize as immutable value" >> { + val stat = stringStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed, immutable = true) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + + unpacked.clear must throwAn[Exception] + unpacked.+=(stat) must throwAn[Exception] + unpacked.observe(features1.head) must throwAn[Exception] + unpacked.unobserve(features1.head) must throwAn[Exception] + } + + "combine two TopKs" >> { + val stat = stringStat + val stat2 = stringStat + + features1.foreach(stat.observe) + features2.foreach(stat2.observe) + + stat2.size mustEqual 5 + stat2.topK(10) mustEqual Seq(("name100-2", 50), ("name50-2", 20), ("name30-2", 15), ("name10-2", 10), ("name15-2", 5)) + + stat += stat2 + + stat.size mustEqual 10 + + stat.topK(10) mustEqual Seq(("name100", 50), ("name100-2", 50), ("name50", 20), ("name50-2", 20), + ("name30", 15), ("name30-2", 15), ("name10", 10), ("name10-2", 10), ("name15", 5), ("name15-2", 5)) + + stat2.size mustEqual 5 + stat2.topK(10) mustEqual Seq(("name100-2", 50), ("name50-2", 20), ("name30-2", 15), ("name10-2", 10), ("name15-2", 5)) + } + + "clear" >> { + val stat = stringStat + features1.foreach(stat.observe) + stat.clear() + + stat.isEmpty must beTrue + stat.size mustEqual 0 + stat.topK(10) must beEmpty + } + } + + "work with longs" >> { + "correctly calculate values" >> { + val stat = longStat + features1.foreach(stat.observe) + stat.isEmpty must beFalse + stat.size mustEqual 5 + stat.topK(10) mustEqual Seq((100L, 50), (50L, 20), (30L, 15), (10L, 10), (15L, 5)) + } + + "serialize and deserialize" >> { + val stat = longStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + } + + "work with doubles" >> { + "correctly calculate values" >> { + val stat = doubleStat + features1.foreach(stat.observe) + stat.isEmpty must beFalse + stat.size mustEqual 5 + stat.topK(10) mustEqual Seq((100.0, 50), (50.0, 20), (30.0, 15), (10.0, 10), (15.0, 5)) + } + + "serialize and deserialize" >> { + val stat = doubleStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + } + + "work with dates" >> { + + def toDate(year: Int) = GeoToolsDateFormat.parseDateTime(f"2$year%03d-01-01T00:00:00.000Z").toDate + + "correctly calculate values" >> { + val stat = dateStat + features1.foreach(stat.observe) + stat.isEmpty must beFalse + stat.size mustEqual 5 + stat.topK(10) mustEqual Seq((toDate(100), 50), (toDate(50), 20), (toDate(30), 15), (toDate(10), 10), (toDate(15), 5)) + } + + "serialize and deserialize" >> { + val stat = dateStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + } + + "work with geometries" >> { + + def toGeom(lon: Int) = WKTUtils.read(s"POINT($lon 0)") + + "correctly calculate values" >> { + val stat = geomStat + features1.foreach(stat.observe) + stat.isEmpty must beFalse + stat.size mustEqual 5 + stat.topK(10) mustEqual Seq((toGeom(100), 50), (toGeom(50), 20), (toGeom(30), 15), (toGeom(10), 10), (toGeom(15), 5)) + } + + "serialize and deserialize" >> { + val stat = geomStat + features1.foreach(stat.observe) + val packed = StatSerializer(sft).serialize(stat) + val unpacked = StatSerializer(sft).deserialize(packed) + + unpacked must beAnInstanceOf[TopK[String]] + unpacked.asInstanceOf[TopK[String]].size mustEqual stat.size + unpacked.asInstanceOf[TopK[String]].attribute mustEqual stat.attribute + unpacked.asInstanceOf[TopK[String]].toJson mustEqual stat.toJson + unpacked.isEquivalent(stat) must beTrue + } + } + } +}