Skip to content

Commit

Permalink
Add configurable protection against server-bug induced resets
Browse files Browse the repository at this point in the history
Includes some refactoring to pull out the common commit progress tracking
that is useful for both reset tracking and commit refreshing. Did not
want to keep state twice, so you get this little bit of twisted design
with the progress tracker, but keeps the logic contained.

Continues to keep the same amount of overhead (that is, very little)
for when commit refreshing and reset tracking are not enabled. At the
same time, avoids double work when both are enabled

References akka#1286
  • Loading branch information
jyates committed Dec 28, 2020
1 parent 59b0c27 commit e46dcea
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 69 deletions.
7 changes: 7 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ akka.kafka.consumer {
backoff-factor = 2.0
}

# If we get an offset back from Kafka that is greater than this number of offsets before the requested offset for
# the partition, we identify it as a broker-side error and attempt to reset the consumer to the latest committed
# offset
# Only used if greater than 0.
reset-protection {
threshold = 0
}
}
# // #consumer-settings

Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/akka/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object ConsumerSettings {
val drainingCheckInterval = config.getDuration("eos-draining-check-interval").asScala
val connectionCheckerSettings = ConnectionCheckerSettings(config.getConfig(ConnectionCheckerSettings.configPath))
val partitionHandlerWarning = config.getDuration("partition-handler-warning").asScala
val resetProtectionThreshold = config.getLong("reset-protection.threshold")

new ConsumerSettings[K, V](
properties,
Expand All @@ -110,7 +111,8 @@ object ConsumerSettings {
enrichAsync = None,
ConsumerSettings.createKafkaConsumer,
connectionCheckerSettings,
partitionHandlerWarning
partitionHandlerWarning,
resetProtectionThreshold
)
}

Expand Down Expand Up @@ -266,7 +268,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
val enrichAsync: Option[ConsumerSettings[K, V] => Future[ConsumerSettings[K, V]]],
val consumerFactory: ConsumerSettings[K, V] => Consumer[K, V],
val connectionCheckerSettings: ConnectionCheckerSettings,
val partitionHandlerWarning: FiniteDuration
val partitionHandlerWarning: FiniteDuration,
val resetProtectionThreshold: Long
) {

/**
Expand Down Expand Up @@ -558,7 +561,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
enrichAsync: Option[ConsumerSettings[K, V] => Future[ConsumerSettings[K, V]]] = enrichAsync,
consumerFactory: ConsumerSettings[K, V] => Consumer[K, V] = consumerFactory,
connectionCheckerConfig: ConnectionCheckerSettings = connectionCheckerSettings,
partitionHandlerWarning: FiniteDuration = partitionHandlerWarning
partitionHandlerWarning: FiniteDuration = partitionHandlerWarning,
resetProtectionThreshold: Long = resetProtectionThreshold
): ConsumerSettings[K, V] =
new ConsumerSettings[K, V](
properties,
Expand All @@ -580,7 +584,8 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
enrichAsync,
consumerFactory,
connectionCheckerConfig,
partitionHandlerWarning
partitionHandlerWarning,
resetProtectionThreshold
)

/**
Expand Down Expand Up @@ -650,6 +655,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
s"connectionCheckerSettings=$connectionCheckerSettings," +
s"partitionHandlerWarning=${partitionHandlerWarning.toCoarsest}" +
s"enrichAsync=${enrichAsync.map(_ => "needs to be applied")}" +
s"resetProtectionThreshold=$resetProtectionThreshold" +
")"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.internal
import org.apache.kafka.clients.consumer.{Consumer, OffsetAndMetadata}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._

/**
* Track the current state of the consumer: what offsets its has requested and committed, filtering by the current
* assignments to the consumer. When a partition is assigned the consumer for the first time, its assigned offset
* is the current position of the partition (uses underlying Kafka Consumer to leverage the configured offset-reset
* policy).
*/
trait ConsumerProgressTracking {
var committedOffsets: Map[TopicPartition, OffsetAndMetadata] = _
var requestedOffsets: Map[TopicPartition, OffsetAndMetadata] = _

def add(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {}
def committed(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {}
def revoke(revokedTps: Set[TopicPartition]): Unit = {}
def assignedPositions(assignedTps: Set[TopicPartition], assignedOffsets: Map[TopicPartition, Long]): Unit = {}
def assignedPositions(assignedTps: Set[TopicPartition],
consumer: Consumer[_, _],
positionTimeout: java.time.Duration): Unit = {}
def addProgressTrackingCallback(callback: ConsumerProgressTracking): Unit = {}
}

class ConsumerProgressTrackerImpl extends ConsumerProgressTracking {
private var assignedOffsetsCallbacks: Seq[ConsumerProgressTracking] = Seq()
requestedOffsets = Map.empty[TopicPartition, OffsetAndMetadata]
committedOffsets = Map.empty[TopicPartition, OffsetAndMetadata]

override def addProgressTrackingCallback(callback: ConsumerProgressTracking): Unit = {
assignedOffsetsCallbacks = assignedOffsetsCallbacks :+ callback
}

override def add(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = {
requestedOffsets = requestedOffsets ++ offsets
assignedOffsetsCallbacks.foreach(_.add(offsets))
}

override def committed(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
committedOffsets = committedOffsets ++ offsets.asScala.toMap
assignedOffsetsCallbacks.foreach(_.committed(offsets))
}

override def revoke(revokedTps: Set[TopicPartition]): Unit = {
requestedOffsets = requestedOffsets -- revokedTps
committedOffsets = committedOffsets -- revokedTps
assignedOffsetsCallbacks.foreach(_.revoke(revokedTps))
}

override def assignedPositions(assignedTps: Set[TopicPartition], assignedOffsets: Map[TopicPartition, Long]): Unit = {
requestedOffsets = requestedOffsets ++ assignedOffsets.map {
case (partition, offset) =>
partition -> requestedOffsets.getOrElse(partition, new OffsetAndMetadata(offset))
}
committedOffsets = committedOffsets ++ assignedOffsets.map {
case (partition, offset) =>
partition -> committedOffsets.getOrElse(partition, new OffsetAndMetadata(offset))
}
assignedOffsetsCallbacks.foreach(_.assignedPositions(assignedTps, assignedOffsets))
}

override def assignedPositions(assignedTps: Set[TopicPartition],
consumer: Consumer[_, _],
positionTimeout: java.time.Duration): Unit = {
val assignedOffsets = assignedTps.map(tp => tp -> consumer.position(tp, positionTimeout)).toMap
assignedPositions(assignedTps, assignedOffsets)
}
}
103 changes: 103 additions & 0 deletions core/src/main/scala/akka/kafka/internal/ConsumerResetProtection.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.kafka.internal

import java.util

import akka.actor.ActorRef
import akka.event.LoggingAdapter
import akka.kafka.internal.KafkaConsumerActor.Internal.Seek
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition

import scala.jdk.CollectionConverters._

trait ConsumerResetProtection {
def protect[K, V](records: ConsumerRecords[K, V]): ConsumerRecords[K, V]
}

object ConsumerResetProtection {
def apply[K, V](ref: ActorRef,
log: LoggingAdapter,
threshold: Long,
progress: () => ConsumerProgressTracking): ConsumerResetProtection = {
if (threshold > 0) new Impl(ref, log, threshold, progress()) else new Noop()
}

private final class Noop() extends ConsumerResetProtection {
override def protect[K, V](records: ConsumerRecords[K, V]): ConsumerRecords[K, V] = records
}

private final class Impl(consumer: ActorRef,
log: LoggingAdapter,
allowedMaxDeltaFromRequest: Long,
progress: ConsumerProgressTracking)
extends ConsumerResetProtection {
override def protect[K, V](records: ConsumerRecords[K, V]): ConsumerRecords[K, V] = {
// check the fetched offsets of the records for each partition are not wildly different than the
// records that we have seen thus far
val safe: java.util.Map[TopicPartition, java.util.List[ConsumerRecord[K, V]]] =
records
.partitions()
.asScala
.map(tp => {
val partitionRecords = records.records(tp)
progress.requestedOffsets.get(tp) match {
case Some(requested) =>
val previouslyRequestedOffset = requested.offset()
val threshold = previouslyRequestedOffset - allowedMaxDeltaFromRequest
// if there are records found outside the threshold:
// 1. Drop the records returned for the partition (they are outside the threshold)
// 2. Request a seek to the latest committed offset of the partition (known to be "safe").
// 3. New records for the partition arrive with the next poll, which should be within the threshold
if (recordsExceedThreshold(threshold, partitionRecords)) {
// requested and committed are assumed to be kept in-sync, so this _should_ be safe. Fails
// catastrophically if this is not the case
val committed = progress.committedOffsets(tp)
log.warning(
s"Dropping offsets for partition $tp - received an offset which is less than "
+ s"allowed $threshold from the last requested offset "
+ s"(threshold: $allowedMaxDeltaFromRequest). "
+ s"Seeking to the latest known safe (committed or assigned) offset: $committed"
)
consumer ! Seek(Map(tp -> committed.offset()))
None
} else {
Some((tp, partitionRecords))
}
case None =>
// it's a partition that we have no information on, so assume it's safe and continue because it's likely
// due to a rebalance, in which we have already reset to the committed offset, which is safe
Some((tp, partitionRecords))
}
})
.filter(_.isDefined)
.map(_.get)
.toMap
.asJava

new ConsumerRecords[K, V](safe)
}

/**
* Check that the records' offsets are exceed the specified offset threshold.
* @return `true` if the records in the batch have gone outside the threshold, `false` otherwise.
*/
private def recordsExceedThreshold[K, V](threshold: Long,
partitionRecords: util.List[ConsumerRecord[K, V]]): Boolean = {
var exceedThreshold = false
// rather than check all the records in the batch, trust that Kafka has given them to us in order, and just
// check the first and last offsets in the batch.
if (partitionRecords.size() > 0) {
exceedThreshold = partitionRecords.get(0).offset() < threshold
if (!exceedThreshold) {
exceedThreshold = partitionRecords.get(partitionRecords.size() - 1).offset() < threshold
}
}
exceedThreshold
}
}
}
Loading

0 comments on commit e46dcea

Please sign in to comment.