Skip to content

Commit

Permalink
[finatra-kafka] Sanitize topic name in MonitoringConsumer stats scope
Browse files Browse the repository at this point in the history
Problem:

MonitoringConsumer does not sanitize scope of stats the same way
as the rest of the other stats are generated for Kafka consumers.
Resulting in metrics that are scoped differently within the Kafka consumer
in the application.

Solution:

Use the existing `sanitizeMetricName` method to sanitize
stats scope.

Result:

MonitoringConsumerInterceptor stats are sanitized in a consistent way

Differential Revision: https://phabricator.twitter.biz/D373402
  • Loading branch information
mattdickinson5 authored and jenkins committed Sep 24, 2019
1 parent 32833de commit 0df09c6
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 33 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.rst
Expand Up @@ -10,14 +10,17 @@ Unreleased
Fixed
~~~~~

* finatra-kafka: Sanitize topic name in MonitoringConsumer stats scope
``PHAB_ID=D373402``

* inject-server: Fix printing of all stats from the underlying `InMemoryStatsReceiver` in
the `eventually` loop for stat assertion. Address finatra/kafka test logging for
finatra/kakfa-streams/kafka-streams and finatra/kafka. ``PHAB__ID=D372108``

* inject-logback: A `NullReferenceException` could be thrown during metrics
collection due to an incorrect logback.xml configuration. This has been fixed.
``PHAB_ID=D369234``

19.9.0
------

Expand Down
Expand Up @@ -72,7 +72,7 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest {
)
assertTimeSincePublishedSet(
serverBeforeRestart,
"kafka/consumer/wordcount-prod-CountsStore-repartition/time_since_record_published_ms"
"kafka/consumer/wordcount_prod_CountsStore_repartition/time_since_record_published_ms"
)

serverBeforeRestart.close()
Expand All @@ -90,10 +90,10 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest {
// val serverAfterRestartStats = new InMemoryStatsUtil(serverAfterRestart.injector)
// serverAfterRestartStats.gauges.waitForUntil("kafka/thread2/restore_consumer/records_consumed_total", _ > 0)
serverAfterRestartStats.stats.get(
"kafka/consumer/wordcount-prod-CountsStore-changelog/time_since_record_published_ms"
"kafka/consumer/wordcount_prod_CountsStore_changelog/time_since_record_published_ms"
) should be(None)
serverAfterRestartStats.stats.get(
"kafka/consumer/wordcount-prod-CountsStore-changelog/time_since_record_timestamp_ms"
"kafka/consumer/wordcoun_prod_CountsStore_changelog/time_since_record_timestamp_ms"
) should be(None)
serverAfterRestart.close()
Await.result(serverAfterRestart.mainResult)
Expand Down
Expand Up @@ -16,7 +16,7 @@ class WordCountInMemoryServer extends KafkaStreamsTwitterServer {

override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
builder.asScala
.stream("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
.stream("text-lines-topic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
.flatMapValues(_.split(' '))
.groupBy((_, word) => word)(Grouped.`with`(Serdes.String, Serdes.String))
.count()(
Expand Down
Expand Up @@ -13,14 +13,14 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest {
disableTestLogging = true
)

private val textLinesTopic = kafkaTopic(ScalaSerdes.Long, Serdes.String, "TextLinesTopic")
private val textLinesTopic = kafkaTopic(ScalaSerdes.Long, Serdes.String, "text-lines-topic")
private val wordsWithCountsTopic = kafkaTopic(Serdes.String, Serdes.Long, "WordsWithCountsTopic")

test("word count") {
server.start()

textLinesTopic.publish(1L -> "hello world hello")
waitForKafkaMetric("kafka/thread1/consumer/TextLinesTopic/records_consumed_total", 1)
waitForKafkaMetric("kafka/thread1/consumer/text_lines_topic/records_consumed_total", 1)
wordsWithCountsTopic.consumeMessages(numMessages = 3) should contain theSameElementsAs Seq(
"world" -> 1,
"hello" -> 1,
Expand All @@ -36,7 +36,7 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest {
)

textLinesTopic.publish(1L -> "world world")
waitForKafkaMetric("kafka/thread1/consumer/TextLinesTopic/records_consumed_total", 2)
waitForKafkaMetric("kafka/thread1/consumer/text_lines_topic/records_consumed_total", 2)
wordsWithCountsTopic.consumeMessages(numMessages = 2) should contain theSameElementsAs Seq(
"world" -> 2,
"world" -> 3
Expand All @@ -53,5 +53,8 @@ class WordCountInMemoryServerFeatureTest extends KafkaStreamsFeatureTest {
"kafka/stream/state",
2.0f
)

assert(server.inMemoryStats.stats.get("kafka/consumer/text-lines-topic/time_since_record_timestamp_ms").isEmpty)
assert(server.inMemoryStats.stats.get("kafka/consumer/text_lines_topic/time_since_record_timestamp_ms").isDefined)
}
}
Expand Up @@ -3,6 +3,7 @@ package com.twitter.finatra.kafka.interceptors
import com.google.common.primitives.Longs
import com.twitter.finatra.kafka.interceptors.PublishTimeProducerInterceptor._
import com.twitter.finagle.stats.{LoadedStatsReceiver, Stat, StatsReceiver}
import com.twitter.finatra.kafka.stats.KafkaFinagleMetricsReporter
import com.twitter.finatra.kafka.utils.ConfigUtils
import com.twitter.inject.Injector
import com.twitter.util.Time
Expand Down Expand Up @@ -88,7 +89,7 @@ class MonitoringConsumerInterceptor extends ConsumerInterceptor[Any, Any] {

private def createNewStat(topicName: String, statName: String): Stat = {
consumerStatsReceiver
.scope(topicName)
.scope(KafkaFinagleMetricsReporter.sanitizeMetricName(topicName))
.stat(statName)
}

Expand Down
Expand Up @@ -11,7 +11,7 @@ import scala.collection.mutable

object KafkaFinagleMetricsReporter {

private[kafka] val IncludeNodeMetrics = "include.node.metrics"
private[kafka] val IncludeNodeMetrics: String = "include.node.metrics"

//Hack to allow tests to use an injected StatsReceiver suitable for assertions
private var globalStatsReceiver: StatsReceiver = LoadedStatsReceiver
Expand All @@ -20,16 +20,16 @@ object KafkaFinagleMetricsReporter {
globalStatsReceiver = injector.instance[StatsReceiver]
}

def sanitizeMetricName(metricName: String) = {
def sanitizeMetricName(metricName: String): String = {
KafkaFinagleMetricsReporter.notAllowedMetricPattern
.matcher(metricName)
.replaceAll("_")
}

private val notAllowedMetricPattern =
private val notAllowedMetricPattern: Pattern =
Pattern.compile("-| -> |: |, |\\(|\\)| |[^\\w\\d]&&[^./]")

private val rateMetricsToIgnore = Set(
private val rateMetricsToIgnore: Set[String] = Set(
"batch-split-rate",
"buffer-exhausted-rate",
"byte-rate",
Expand Down
40 changes: 20 additions & 20 deletions kafka/src/test/scala/BUILD
@@ -1,3 +1,23 @@
junit_tests(
sources = globs(
"com/twitter/finatra/kafka/consumers/*.scala",
"com/twitter/finatra/kafka/serde/*.scala",
"com/twitter/finatra/kafka/test/integration/*.scala",
),
compiler_option_sets = {"fatal_warnings"},
strict_deps = True,
dependencies = [
":test-deps",
"3rdparty/jvm/org/apache/zookeeper:zookeeper-server",
"3rdparty/jvm/org/slf4j:slf4j-simple",
"finatra/inject/inject-modules/src/test/scala:test-deps",
"finatra/jackson/src/test/scala:test-deps",
"finatra/kafka/src/main/java",
"finatra/kafka/src/test/resources",
"finatra/kafka/src/test/thrift:thrift-scala",
],
)

scala_library(
name = "test-deps",
sources = globs(
Expand Down Expand Up @@ -45,23 +65,3 @@ scala_library(
"finatra/kafka/src/main/scala",
],
)

junit_tests(
sources = globs(
"com/twitter/finatra/kafka/consumers/*.scala",
"com/twitter/finatra/kafka/serde/*.scala",
"com/twitter/finatra/kafka/test/integration/*.scala",
),
compiler_option_sets = {"fatal_warnings"},
strict_deps = True,
dependencies = [
":test-deps",
"3rdparty/jvm/org/apache/zookeeper:zookeeper-server",
"3rdparty/jvm/org/slf4j:slf4j-simple",
"finatra/inject/inject-modules/src/test/scala:test-deps",
"finatra/jackson/src/test/scala:test-deps",
"finatra/kafka/src/main/java",
"finatra/kafka/src/test/resources",
"finatra/kafka/src/test/thrift:thrift-scala",
],
)

0 comments on commit 0df09c6

Please sign in to comment.