From 0df09c6b540a2e2d3cc5038b628f97eee32d5237 Mon Sep 17 00:00:00 2001 From: Matthew Dickinson Date: Tue, 24 Sep 2019 23:03:05 +0000 Subject: [PATCH] [finatra-kafka] Sanitize topic name in MonitoringConsumer stats scope Problem: MonitoringConsumer does not sanitize scope of stats the same way as the rest of the other stats are generated for Kafka consumers. Resulting in metrics that are scoped differently within the Kafka consumer in the application. Solution: Use the existing `sanitizeMetricName` method to sanitize stats scope. Result: MonitoringConsumerInterceptor stats are sanitized in a consistent way Differential Revision: https://phabricator.twitter.biz/D373402 --- CHANGELOG.rst | 5 ++- .../WordCountServerFeatureTest.scala | 6 +-- .../WordCountInMemoryServer.scala | 2 +- .../WordCountInMemoryServerFeatureTest.scala | 9 +++-- .../MonitoringConsumerInterceptor.scala | 3 +- .../stats/KafkaFinagleMetricsReporter.scala | 8 ++-- kafka/src/test/scala/BUILD | 40 +++++++++---------- 7 files changed, 40 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 61547a5b09..180def3a9a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,9 @@ Unreleased Fixed ~~~~~ +* finatra-kafka: Sanitize topic name in MonitoringConsumer stats scope + ``PHAB_ID=D373402`` + * inject-server: Fix printing of all stats from the underlying `InMemoryStatsReceiver` in the `eventually` loop for stat assertion. Address finatra/kafka test logging for finatra/kakfa-streams/kafka-streams and finatra/kafka. ``PHAB__ID=D372108`` @@ -17,7 +20,7 @@ Fixed * inject-logback: A `NullReferenceException` could be thrown during metrics collection due to an incorrect logback.xml configuration. This has been fixed. ``PHAB_ID=D369234`` - + 19.9.0 ------ diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/WordCountServerFeatureTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/WordCountServerFeatureTest.scala index f2b8254ace..1b559e26a9 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/WordCountServerFeatureTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount/WordCountServerFeatureTest.scala @@ -72,7 +72,7 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest { ) assertTimeSincePublishedSet( serverBeforeRestart, - "kafka/consumer/wordcount-prod-CountsStore-repartition/time_since_record_published_ms" + "kafka/consumer/wordcount_prod_CountsStore_repartition/time_since_record_published_ms" ) serverBeforeRestart.close() @@ -90,10 +90,10 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest { // val serverAfterRestartStats = new InMemoryStatsUtil(serverAfterRestart.injector) // serverAfterRestartStats.gauges.waitForUntil("kafka/thread2/restore_consumer/records_consumed_total", _ > 0) serverAfterRestartStats.stats.get( - "kafka/consumer/wordcount-prod-CountsStore-changelog/time_since_record_published_ms" + "kafka/consumer/wordcount_prod_CountsStore_changelog/time_since_record_published_ms" ) should be(None) serverAfterRestartStats.stats.get( - "kafka/consumer/wordcount-prod-CountsStore-changelog/time_since_record_timestamp_ms" + "kafka/consumer/wordcoun_prod_CountsStore_changelog/time_since_record_timestamp_ms" ) should be(None) serverAfterRestart.close() Await.result(serverAfterRestart.mainResult) diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServer.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServer.scala index fc50cb86ce..83cb051c58 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServer.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServer.scala @@ -16,7 +16,7 @@ class WordCountInMemoryServer extends KafkaStreamsTwitterServer { override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = { builder.asScala - .stream("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String)) + .stream("text-lines-topic")(Consumed.`with`(Serdes.Bytes, Serdes.String)) .flatMapValues(_.split(' ')) .groupBy((_, word) => word)(Grouped.`with`(Serdes.String, Serdes.String)) .count()( diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServerFeatureTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServerFeatureTest.scala index ff647fb597..14f04282bf 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServerFeatureTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/finatra/kafkastreams/integration/wordcount_in_memory/WordCountInMemoryServerFeatureTest.scala @@ -13,14 +13,14 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest { disableTestLogging = true ) - private val textLinesTopic = kafkaTopic(ScalaSerdes.Long, Serdes.String, "TextLinesTopic") + private val textLinesTopic = kafkaTopic(ScalaSerdes.Long, Serdes.String, "text-lines-topic") private val wordsWithCountsTopic = kafkaTopic(Serdes.String, Serdes.Long, "WordsWithCountsTopic") test("word count") { server.start() textLinesTopic.publish(1L -> "hello world hello") - waitForKafkaMetric("kafka/thread1/consumer/TextLinesTopic/records_consumed_total", 1) + waitForKafkaMetric("kafka/thread1/consumer/text_lines_topic/records_consumed_total", 1) wordsWithCountsTopic.consumeMessages(numMessages = 3) should contain theSameElementsAs Seq( "world" -> 1, "hello" -> 1, @@ -36,7 +36,7 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest { ) textLinesTopic.publish(1L -> "world world") - waitForKafkaMetric("kafka/thread1/consumer/TextLinesTopic/records_consumed_total", 2) + waitForKafkaMetric("kafka/thread1/consumer/text_lines_topic/records_consumed_total", 2) wordsWithCountsTopic.consumeMessages(numMessages = 2) should contain theSameElementsAs Seq( "world" -> 2, "world" -> 3 @@ -53,5 +53,8 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest { "kafka/stream/state", 2.0f ) + + assert(server.inMemoryStats.stats.get("kafka/consumer/text-lines-topic/time_since_record_timestamp_ms").isEmpty) + assert(server.inMemoryStats.stats.get("kafka/consumer/text_lines_topic/time_since_record_timestamp_ms").isDefined) } } diff --git a/kafka/src/main/scala/com/twitter/finatra/kafka/interceptors/MonitoringConsumerInterceptor.scala b/kafka/src/main/scala/com/twitter/finatra/kafka/interceptors/MonitoringConsumerInterceptor.scala index 30e0d0b5ec..eb4d4921c4 100644 --- a/kafka/src/main/scala/com/twitter/finatra/kafka/interceptors/MonitoringConsumerInterceptor.scala +++ b/kafka/src/main/scala/com/twitter/finatra/kafka/interceptors/MonitoringConsumerInterceptor.scala @@ -3,6 +3,7 @@ package com.twitter.finatra.kafka.interceptors import com.google.common.primitives.Longs import com.twitter.finatra.kafka.interceptors.PublishTimeProducerInterceptor._ import com.twitter.finagle.stats.{LoadedStatsReceiver, Stat, StatsReceiver} +import com.twitter.finatra.kafka.stats.KafkaFinagleMetricsReporter import com.twitter.finatra.kafka.utils.ConfigUtils import com.twitter.inject.Injector import com.twitter.util.Time @@ -88,7 +89,7 @@ class MonitoringConsumerInterceptor extends ConsumerInterceptor[Any, Any] { private def createNewStat(topicName: String, statName: String): Stat = { consumerStatsReceiver - .scope(topicName) + .scope(KafkaFinagleMetricsReporter.sanitizeMetricName(topicName)) .stat(statName) } diff --git a/kafka/src/main/scala/com/twitter/finatra/kafka/stats/KafkaFinagleMetricsReporter.scala b/kafka/src/main/scala/com/twitter/finatra/kafka/stats/KafkaFinagleMetricsReporter.scala index b25286029c..bbe1cd992e 100644 --- a/kafka/src/main/scala/com/twitter/finatra/kafka/stats/KafkaFinagleMetricsReporter.scala +++ b/kafka/src/main/scala/com/twitter/finatra/kafka/stats/KafkaFinagleMetricsReporter.scala @@ -11,7 +11,7 @@ import scala.collection.mutable object KafkaFinagleMetricsReporter { - private[kafka] val IncludeNodeMetrics = "include.node.metrics" + private[kafka] val IncludeNodeMetrics: String = "include.node.metrics" //Hack to allow tests to use an injected StatsReceiver suitable for assertions private var globalStatsReceiver: StatsReceiver = LoadedStatsReceiver @@ -20,16 +20,16 @@ object KafkaFinagleMetricsReporter { globalStatsReceiver = injector.instance[StatsReceiver] } - def sanitizeMetricName(metricName: String) = { + def sanitizeMetricName(metricName: String): String = { KafkaFinagleMetricsReporter.notAllowedMetricPattern .matcher(metricName) .replaceAll("_") } - private val notAllowedMetricPattern = + private val notAllowedMetricPattern: Pattern = Pattern.compile("-| -> |: |, |\\(|\\)| |[^\\w\\d]&&[^./]") - private val rateMetricsToIgnore = Set( + private val rateMetricsToIgnore: Set[String] = Set( "batch-split-rate", "buffer-exhausted-rate", "byte-rate", diff --git a/kafka/src/test/scala/BUILD b/kafka/src/test/scala/BUILD index ef856816b6..4be533dbf4 100644 --- a/kafka/src/test/scala/BUILD +++ b/kafka/src/test/scala/BUILD @@ -1,3 +1,23 @@ +junit_tests( + sources = globs( + "com/twitter/finatra/kafka/consumers/*.scala", + "com/twitter/finatra/kafka/serde/*.scala", + "com/twitter/finatra/kafka/test/integration/*.scala", + ), + compiler_option_sets = {"fatal_warnings"}, + strict_deps = True, + dependencies = [ + ":test-deps", + "3rdparty/jvm/org/apache/zookeeper:zookeeper-server", + "3rdparty/jvm/org/slf4j:slf4j-simple", + "finatra/inject/inject-modules/src/test/scala:test-deps", + "finatra/jackson/src/test/scala:test-deps", + "finatra/kafka/src/main/java", + "finatra/kafka/src/test/resources", + "finatra/kafka/src/test/thrift:thrift-scala", + ], +) + scala_library( name = "test-deps", sources = globs( @@ -45,23 +65,3 @@ scala_library( "finatra/kafka/src/main/scala", ], ) - -junit_tests( - sources = globs( - "com/twitter/finatra/kafka/consumers/*.scala", - "com/twitter/finatra/kafka/serde/*.scala", - "com/twitter/finatra/kafka/test/integration/*.scala", - ), - compiler_option_sets = {"fatal_warnings"}, - strict_deps = True, - dependencies = [ - ":test-deps", - "3rdparty/jvm/org/apache/zookeeper:zookeeper-server", - "3rdparty/jvm/org/slf4j:slf4j-simple", - "finatra/inject/inject-modules/src/test/scala:test-deps", - "finatra/jackson/src/test/scala:test-deps", - "finatra/kafka/src/main/java", - "finatra/kafka/src/test/resources", - "finatra/kafka/src/test/thrift:thrift-scala", - ], -)