diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0c364c57f3..28ed968efd 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,6 +10,10 @@ Unreleased Added ~~~~~ +* finatra-kafka-streams: Adding missing ScalaDocs. Adding metric for elapsed state + restore time. RocksDB configuration now contains a flag for adjusting the number + of cache shard bits, `rocksdb.block.cache.shard.bits`. ``PHAB_ID=D255771`` + * finatra-jackson: Added @Pattern annotation to support finatra/jackson for regex pattern validation on string values. ``PHAB_ID=D259719`` diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/FinatraRocksDBConfig.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/FinatraRocksDBConfig.scala index 93fd29f238..43bc8e8da4 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/FinatraRocksDBConfig.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/config/FinatraRocksDBConfig.scala @@ -25,6 +25,7 @@ import org.rocksdb.{ object FinatraRocksDBConfig { val RocksDbBlockCacheSizeConfig = "rocksdb.block.cache.size" + val RocksDbBlockCacheShardBitsConfig = "rocksdb.block.cache.shard.bits" val RocksDbLZ4Config = "rocksdb.lz4" val RocksDbEnableStatistics = "rocksdb.statistics" val RocksDbStatCollectionPeriodMs = "rocksdb.statistics.collection.period.ms" @@ -44,6 +45,9 @@ object FinatraRocksDBConfig { } } +/** + * Maintains the RocksDB configuration used by Kafka Streams. + */ class FinatraRocksDBConfig extends RocksDBConfigSetter with Logging { //See https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options @@ -55,7 +59,7 @@ class FinatraRocksDBConfig extends RocksDBConfigSetter with Logging { if (FinatraRocksDBConfig.SharedBlockCache == null) { val blockCacheSize = getBytesOrDefault(configs, FinatraRocksDBConfig.RocksDbBlockCacheSizeConfig, 100.megabytes) - val numShardBits = 1 //TODO: Make configurable so this can be increased for multi-threaded queryable state access + val numShardBits = getIntOrDefault(configs, FinatraRocksDBConfig.RocksDbBlockCacheShardBitsConfig, 1) FinatraRocksDBConfig.SharedBlockCache = new LRUCache(blockCacheSize, numShardBits) } diff --git a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/listeners/FinatraStateRestoreListener.scala b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/listeners/FinatraStateRestoreListener.scala index b52d2a2308..d28e973b6d 100644 --- a/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/listeners/FinatraStateRestoreListener.scala +++ b/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/kafkastreams/internal/listeners/FinatraStateRestoreListener.scala @@ -4,18 +4,31 @@ import com.twitter.finagle.stats.StatsReceiver import com.twitter.util.logging.Logging import org.apache.kafka.common.TopicPartition import org.apache.kafka.streams.processor.StateRestoreListener +import org.joda.time.DateTimeUtils -class FinatraStateRestoreListener( - statsReceiver: StatsReceiver) //TODO: Add stats for restoration (e.g. total time) +/** + * A [[StateRestoreListener]] that emits logs and metrics relating to state restoration. + * + * @param statsReceiver A StatsReceiver used for metric tracking. + */ +private[kafkastreams] class FinatraStateRestoreListener(statsReceiver: StatsReceiver) extends StateRestoreListener with Logging { + private val scopedStatReceiver = statsReceiver.scope("finatra_state_restore_listener") + private val totalRestoreTime = + scopedStatReceiver.addGauge("restore_time_elapsed_ms")(restoreTimeElapsedMs) + + private var restoreTimestampStartMs: Option[Long] = None + private var restoreTimestampEndMs: Option[Long] = None + override def onRestoreStart( topicPartition: TopicPartition, storeName: String, startingOffset: Long, endingOffset: Long ): Unit = { + restoreTimestampStartMs = Some(DateTimeUtils.currentTimeMillis) val upToRecords = endingOffset - startingOffset info( s"${storeAndPartition(storeName, topicPartition)} start restoring up to $upToRecords records from $startingOffset to $endingOffset" @@ -36,12 +49,20 @@ class FinatraStateRestoreListener( storeName: String, totalRestored: Long ): Unit = { + restoreTimestampEndMs = Some(DateTimeUtils.currentTimeMillis) info( - s"${storeAndPartition(storeName, topicPartition)} finished restoring $totalRestored records" + s"${storeAndPartition(storeName, topicPartition)} finished restoring $totalRestored records in $restoreTimeElapsedMs ms" ) } - private def storeAndPartition(storeName: String, topicPartition: TopicPartition) = { + private def storeAndPartition(storeName: String, topicPartition: TopicPartition): String = { s"$storeName topic ${topicPartition.topic}_${topicPartition.partition}" } + + private def restoreTimeElapsedMs: Long = { + val currentTimestampMs = DateTimeUtils.currentTimeMillis + restoreTimestampEndMs.getOrElse(currentTimestampMs) - restoreTimestampStartMs.getOrElse( + currentTimestampMs + ) + } } diff --git a/kafka-streams/kafka-streams/src/test/scala/com/twitter/unittests/integration/wordcount/WordCountServerFeatureTest.scala b/kafka-streams/kafka-streams/src/test/scala/com/twitter/unittests/integration/wordcount/WordCountServerFeatureTest.scala index e4bca39e69..7ea6f4b355 100644 --- a/kafka-streams/kafka-streams/src/test/scala/com/twitter/unittests/integration/wordcount/WordCountServerFeatureTest.scala +++ b/kafka-streams/kafka-streams/src/test/scala/com/twitter/unittests/integration/wordcount/WordCountServerFeatureTest.scala @@ -96,6 +96,11 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest { val serverAfterRestart = createServer() serverAfterRestart.start() + val serverAfterRestartStats = InMemoryStatsUtil(serverAfterRestart.injector) + serverAfterRestartStats.waitForGaugeUntil( + "kafka/stream/finatra_state_restore_listener/restore_time_elapsed_ms", + _ >= 0 + ) textLinesTopic.publish(1L -> "world world") wordsWithCountsTopic.consumeAsManyMessagesUntilMap(Map("world" -> 5L))