From 948e630310b286013fa03c8daced5a366e3792cf Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Tue, 3 Oct 2023 22:36:29 +0000 Subject: [PATCH] GEOMESA-3304 Expose latency metrics in Kafka data store --- .../index/KafkaFeatureCacheWithMetrics.scala | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheWithMetrics.scala b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheWithMetrics.scala index 443278c3b0d0..d1b4c294da15 100644 --- a/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheWithMetrics.scala +++ b/geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/index/KafkaFeatureCacheWithMetrics.scala @@ -8,13 +8,17 @@ package org.locationtech.geomesa.kafka.index -import com.codahale.metrics.Gauge +import com.codahale.metrics.{Gauge, Histogram} import org.locationtech.geomesa.kafka.data.KafkaDataStore.{IndexConfig, LayerView} import org.locationtech.geomesa.kafka.index.FeatureStateFactory.FeatureState import org.locationtech.geomesa.kafka.index.KafkaFeatureCacheWithMetrics.SizeGauge import org.locationtech.geomesa.metrics.core.GeoMesaMetrics import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType} +import java.util.Date +import java.util.concurrent.atomic.AtomicLong +import scala.annotation.tailrec + class KafkaFeatureCacheWithMetrics( sft: SimpleFeatureType, config: IndexConfig, @@ -22,15 +26,31 @@ class KafkaFeatureCacheWithMetrics( metrics: GeoMesaMetrics ) extends KafkaFeatureCacheImpl(sft, config, views) { + import KafkaFeatureCacheWithMetrics.{DateMetrics, LastDateGauge} + import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType + metrics.gauge(sft.getTypeName, "index-size", new SizeGauge(this)) private val updates = metrics.meter(sft.getTypeName, "updates") private val removals = metrics.meter(sft.getTypeName, "removals") private val expirations = metrics.meter(sft.getTypeName, "expirations") + private val dtgMetrics = sft.getDtgIndex.map { i => + val last = metrics.gauge(sft.getTypeName, "dtg.latest", new LastDateGauge()).asInstanceOf[LastDateGauge] + val latency = metrics.histogram(sft.getTypeName, "dtg.latency.millis") + DateMetrics(i, last, latency) + } + override def put(feature: SimpleFeature): Unit = { super.put(feature) updates.mark() + dtgMetrics.foreach { case DateMetrics(index, dtg, latency) => + val date = feature.getAttribute(index).asInstanceOf[Date] + if (date != null) { + dtg.setLatest(date) + latency.update(System.currentTimeMillis() - date.getTime) + } + } } override def remove(id: String): Unit = { @@ -46,7 +66,30 @@ class KafkaFeatureCacheWithMetrics( object KafkaFeatureCacheWithMetrics { + case class DateMetrics(i: Int, last: LastDateGauge, latency: Histogram) + class SizeGauge(cache: KafkaFeatureCache) extends Gauge[Int] { override def getValue: Int = cache.size() } + + /** + * Tracks last date + */ + class LastDateGauge extends Gauge[Date] { + + private val value = new AtomicLong(0) + + override def getValue: Date = new Date(value.get) + + def setLatest(value: Date): Unit = setLatest(value.getTime) + + @tailrec + private final def setLatest(value: Long): Unit = { + val prev = this.value.get + if (prev < value && !this.value.compareAndSet(prev, value)) { + setLatest(value) + } + } + } + }