Skip to content

Commit

Permalink
finatra-kafka-streams: Combine FinatraTransformer with FinatraTransfo…
Browse files Browse the repository at this point in the history
…rmerV2

Problem

FinatraTransformer calls timersStore.all() on each timer fire and consequently
may have to iterate over many tombstoned timers.

Solution

Combine the original FinatraTransformer with FinatraTransformerV2, which
handles timers in a more performant way.

Result

FinatraTransformer avoids potentially having to traverse many tombstoned
timers when calling timersStores.all().

JIRA Issues: DINS-2490

Differential Revision: https://phabricator.twitter.biz/D254411
  • Loading branch information
k-liao authored and jenkins committed Jan 25, 2019
1 parent ecf2e54 commit cd455c4
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 535 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -16,6 +16,8 @@ Added
Changed
~~~~~~~

* finatra-kafka-streams: Combine FinatraTransformer with FinatraTransformerV2. ``PHAB_ID=D254411``

* finatra-thrift: The return type of `ReqRepDarkTrafficFilterModule#newFilter` has been changed from
`DarkTrafficFilter[MethodIface]` to `Filter.TypeAgnostic`. ``PHAB_ID=D261868``

Expand Down
2 changes: 1 addition & 1 deletion doc/src/sphinx/user-guide/kafka-streams/index.rst
Expand Up @@ -23,7 +23,7 @@ a fully functional service can be written by simply configuring the Kafka Stream
Transformers
~~~~~~~~~~~~

Implement custom `transformers <https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Transformer.html>`__ using `FinatraTransformerV2 <https://github.com/twitter/finatra/blob/develop/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/streams/transformer/FinatraTransformerV2.scala>`__.
Implement custom `transformers <https://kafka.apache.org/21/javadoc/org/apache/kafka/streams/kstream/Transformer.html>`__ using `FinatraTransformer <https://github.com/twitter/finatra/blob/develop/kafka-streams/kafka-streams/src/main/scala/com/twitter/finatra/streams/transformer/FinatraTransformer.scala>`__.

Aggregations
^^^^^^^^^^^^
Expand Down
Expand Up @@ -7,7 +7,7 @@ import com.twitter.finatra.streams.transformer.domain.{
Time,
TimerMetadata
}
import com.twitter.finatra.streams.transformer.{FinatraTransformerV2, PersistentTimers}
import com.twitter.finatra.streams.transformer.{FinatraTransformer, PersistentTimers}
import com.twitter.util.Duration
import org.apache.kafka.streams.processor.PunctuationType
import scala.reflect.ClassTag
Expand All @@ -31,7 +31,7 @@ class ReservoirSamplingTransformer[
countStoreName: String,
sampleStoreName: String,
timerStoreName: String)
extends FinatraTransformerV2[Key, Value, SampleKey, SampleValue](statsReceiver = statsReceiver)
extends FinatraTransformer[Key, Value, SampleKey, SampleValue](statsReceiver = statsReceiver)
with PersistentTimers {

private val numExpiredCounter = statsReceiver.counter("numExpired")
Expand Down
Expand Up @@ -7,7 +7,7 @@ import com.twitter.util.Duration
/**
* FlushingAwareServer must be mixed in to servers that rely on manually controlling when a flush/commit occurs.
* As such, this trait will be needed when using the following classes, FlushingProcessor, FlushingTransformer,
* AsyncProcessor, AsyncTransformer, FinatraTransformer, and FinatraTransformerV2
* AsyncProcessor, AsyncTransformer, and FinatraTransformer
*
* This trait sets 'kafka.commit.interval' to 'Duration.Top' to disable the normal Kafka Streams commit process.
* As such the only commits that will occur are triggered manually, thus allowing us to control when flush/commit
Expand Down
Expand Up @@ -55,7 +55,7 @@ class AggregatorTransformer[K, V, Aggregate](
queryableAfterClose: Duration,
emitUpdatedEntriesOnCommit: Boolean,
val commitInterval: Duration)
extends FinatraTransformerV2[K, V, TimeWindowed[K], WindowedValue[Aggregate]](statsReceiver)
extends FinatraTransformer[K, V, TimeWindowed[K], WindowedValue[Aggregate]](statsReceiver)
with CachingKeyValueStores[K, V, TimeWindowed[K], WindowedValue[Aggregate]]
with PersistentTimers {

Expand Down
Expand Up @@ -3,7 +3,9 @@ package com.twitter.finatra.streams.transformer
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finatra.streams.transformer.FinatraTransformer.WindowStartTime
import com.twitter.finatra.streams.transformer.domain._
import com.twitter.finatra.streams.transformer.internal.StateStoreImplicits
import com.twitter.util.Duration
import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.state.KeyValueIterator

@deprecated("Use AggregatorTransformer", "1/7/2019")
Expand All @@ -21,12 +23,13 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
extends FinatraTransformer[
CK,
Int,
TimeWindowed[CK],
WindowStartTime,
TimeWindowed[K],
WindowedValue[
scala.collection.Map[A, Int]
]](timerStoreName = timerStoreName, statsReceiver = statsReceiver, cacheTimers = true) {
]](statsReceiver = statsReceiver)
with PersistentTimers
with StateStoreImplicits
with IteratorImplicits {

private val windowSizeMillis = windowSize.inMillis
private val allowedLatenessMillis = allowedLateness.inMillis
Expand All @@ -41,10 +44,15 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
private val putLatencyStat = statsReceiver.stat("putLatency")

private val stateStore = getKeyValueStore[TimeWindowed[CK], Int](stateStoreName)
private val timerStore = getPersistentTimerStore[WindowStartTime](
timerStoreName,
onEventTimer,
PunctuationType.STREAM_TIME
)

override def onMessage(time: Time, compositeKey: CK, count: Int): Unit = {
val windowedCompositeKey = TimeWindowed.forSize(time.hourMillis, windowSizeMillis, compositeKey)
if (windowedCompositeKey.isLate(allowedLatenessMillis, Watermark(watermark))) {
if (windowedCompositeKey.isLate(allowedLatenessMillis, Watermark(watermark.timeMillis))) {
restatementsCounter.incr()
forward(windowedCompositeKey.map { _ =>
compositeKey.primary
Expand All @@ -59,9 +67,9 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
if (newCount == count) {
val closeTime = windowedCompositeKey.startMs + windowSizeMillis + allowedLatenessMillis
if (emitOnClose) {
addEventTimeTimer(Time(closeTime), Close, windowedCompositeKey.startMs)
timerStore.addTimer(Time(closeTime), Close, windowedCompositeKey.startMs)
}
addEventTimeTimer(
timerStore.addTimer(
Time(closeTime + queryableAfterCloseMillis),
Expire,
windowedCompositeKey.startMs
Expand All @@ -77,16 +85,14 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
* TimeWindowedKey(2018-08-04T10:00:00.000Z-40-retweet) -> 4
*/
//Note: We use the cursor even for deletes to skip tombstones that may otherwise slow down the range scan
override def onEventTimer(
private def onEventTimer(
time: Time,
timerMetadata: TimerMetadata,
windowStartMs: WindowStartTime,
cursor: Option[TimeWindowed[CK]]
): TimerResult[TimeWindowed[CK]] = {
windowStartMs: WindowStartTime
): Unit = {
debug(s"onEventTimer $time $timerMetadata")
val windowIterator = stateStore.range(
cursor getOrElse TimeWindowed
.forSize(windowStartMs, windowSizeMillis, compositeKeyRangeStart),
TimeWindowed.forSize(windowStartMs, windowSizeMillis, compositeKeyRangeStart),
TimeWindowed.forSize(windowStartMs + 1, windowSizeMillis, compositeKeyRangeStart)
)

Expand All @@ -104,7 +110,7 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
private def onClosed(
windowStartMs: Long,
windowIterator: KeyValueIterator[TimeWindowed[CK], Int]
): TimerResult[TimeWindowed[CK]] = {
): Unit = {
windowIterator
.groupBy(
primaryKey = timeWindowed => timeWindowed.value.primary,
Expand All @@ -121,14 +127,12 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
)
}

deleteOrRetainTimer(windowIterator, onDeleteTimer = closedCounter.incr())
closedCounter.incr()
}

//Note: We call "put" w/ a null value instead of calling "delete" since "delete" also gets the previous value :-/
//TODO: Consider performing deletes in a transaction so that queryable state sees all or no keys per "primary key"
private def onExpired(
windowIterator: KeyValueIterator[TimeWindowed[CK], Int]
): TimerResult[TimeWindowed[CK]] = {
private def onExpired(windowIterator: KeyValueIterator[TimeWindowed[CK], Int]): Unit = {
windowIterator
.take(maxActionsPerTimer)
.foreach {
Expand All @@ -137,6 +141,6 @@ class CompositeSumAggregator[K, A, CK <: CompositeKey[K, A]](
stateStore.put(timeWindowedCompositeKey, null.asInstanceOf[Int])
}

deleteOrRetainTimer(windowIterator, onDeleteTimer = expiredCounter.incr())
expiredCounter.incr()
}
}

0 comments on commit cd455c4

Please sign in to comment.