Skip to content

Commit

Permalink
kafka-streams: State and Store improvements
Browse files Browse the repository at this point in the history
Problem

Finatra Kafka Streams is currently lacking ScalaDocs and contains
 various open TODOs

Solution

This branch introduces ScalaDocs and resolves several open TODOs

Describe the modifications you've done.

Additional docstrings have been added.
Adding a metric for elapsed state restore time.
RockDB shard bits is now modifiable by a flag.

Result

RocksDB configuration now supports a `rocksdb.block.cache.shard.bits`
flag for specifying the number of block cache shard bits.
Time spent restoring state is now emitted as a Metric,
`kafka/stream/finatra_state_restore_listener/restore_time_elapsed_ms`

JIRA Issues: DINS-2533

Differential Revision: https://phabricator.twitter.biz/D255771
  • Loading branch information
v-haka authored and jenkins committed Jan 28, 2019
1 parent 3747c1a commit afd9a17
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -10,6 +10,10 @@ Unreleased
Added
~~~~~

* finatra-kafka-streams: Adding missing ScalaDocs. Adding metric for elapsed state
restore time. RocksDB configuration now contains a flag for adjusting the number
of cache shard bits, `rocksdb.block.cache.shard.bits`. ``PHAB_ID=D255771``

* finatra-jackson: Added @Pattern annotation to support finatra/jackson for regex pattern
validation on string values. ``PHAB_ID=D259719``

Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.rocksdb.{
object FinatraRocksDBConfig {

val RocksDbBlockCacheSizeConfig = "rocksdb.block.cache.size"
val RocksDbBlockCacheShardBitsConfig = "rocksdb.block.cache.shard.bits"
val RocksDbLZ4Config = "rocksdb.lz4"
val RocksDbEnableStatistics = "rocksdb.statistics"
val RocksDbStatCollectionPeriodMs = "rocksdb.statistics.collection.period.ms"
Expand All @@ -44,6 +45,9 @@ object FinatraRocksDBConfig {
}
}

/**
* Maintains the RocksDB configuration used by Kafka Streams.
*/
class FinatraRocksDBConfig extends RocksDBConfigSetter with Logging {

//See https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
Expand All @@ -55,7 +59,7 @@ class FinatraRocksDBConfig extends RocksDBConfigSetter with Logging {
if (FinatraRocksDBConfig.SharedBlockCache == null) {
val blockCacheSize =
getBytesOrDefault(configs, FinatraRocksDBConfig.RocksDbBlockCacheSizeConfig, 100.megabytes)
val numShardBits = 1 //TODO: Make configurable so this can be increased for multi-threaded queryable state access
val numShardBits = getIntOrDefault(configs, FinatraRocksDBConfig.RocksDbBlockCacheShardBitsConfig, 1)
FinatraRocksDBConfig.SharedBlockCache = new LRUCache(blockCacheSize, numShardBits)
}

Expand Down
Expand Up @@ -4,18 +4,31 @@ import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util.logging.Logging
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.streams.processor.StateRestoreListener
import org.joda.time.DateTimeUtils

class FinatraStateRestoreListener(
statsReceiver: StatsReceiver) //TODO: Add stats for restoration (e.g. total time)
/**
* A [[StateRestoreListener]] that emits logs and metrics relating to state restoration.
*
* @param statsReceiver A StatsReceiver used for metric tracking.
*/
private[kafkastreams] class FinatraStateRestoreListener(statsReceiver: StatsReceiver)
extends StateRestoreListener
with Logging {

private val scopedStatReceiver = statsReceiver.scope("finatra_state_restore_listener")
private val totalRestoreTime =
scopedStatReceiver.addGauge("restore_time_elapsed_ms")(restoreTimeElapsedMs)

private var restoreTimestampStartMs: Option[Long] = None
private var restoreTimestampEndMs: Option[Long] = None

override def onRestoreStart(
topicPartition: TopicPartition,
storeName: String,
startingOffset: Long,
endingOffset: Long
): Unit = {
restoreTimestampStartMs = Some(DateTimeUtils.currentTimeMillis)
val upToRecords = endingOffset - startingOffset
info(
s"${storeAndPartition(storeName, topicPartition)} start restoring up to $upToRecords records from $startingOffset to $endingOffset"
Expand All @@ -36,12 +49,20 @@ class FinatraStateRestoreListener(
storeName: String,
totalRestored: Long
): Unit = {
restoreTimestampEndMs = Some(DateTimeUtils.currentTimeMillis)
info(
s"${storeAndPartition(storeName, topicPartition)} finished restoring $totalRestored records"
s"${storeAndPartition(storeName, topicPartition)} finished restoring $totalRestored records in $restoreTimeElapsedMs ms"
)
}

private def storeAndPartition(storeName: String, topicPartition: TopicPartition) = {
private def storeAndPartition(storeName: String, topicPartition: TopicPartition): String = {
s"$storeName topic ${topicPartition.topic}_${topicPartition.partition}"
}

private def restoreTimeElapsedMs: Long = {
val currentTimestampMs = DateTimeUtils.currentTimeMillis
restoreTimestampEndMs.getOrElse(currentTimestampMs) - restoreTimestampStartMs.getOrElse(
currentTimestampMs
)
}
}
Expand Up @@ -96,6 +96,11 @@ class WordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest {

val serverAfterRestart = createServer()
serverAfterRestart.start()
val serverAfterRestartStats = InMemoryStatsUtil(serverAfterRestart.injector)
serverAfterRestartStats.waitForGaugeUntil(
"kafka/stream/finatra_state_restore_listener/restore_time_elapsed_ms",
_ >= 0
)

textLinesTopic.publish(1L -> "world world")
wordsWithCountsTopic.consumeAsManyMessagesUntilMap(Map("world" -> 5L))
Expand Down

0 comments on commit afd9a17

Please sign in to comment.