From d909056f20c49c64f052067dfd66e8514152e554 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 24 Jan 2021 11:50:56 -0800 Subject: [PATCH] [SPARK-34187][SS] Use available offset range obtained during polling when checking offset validation ### What changes were proposed in this pull request? This patch uses the available offset range obtained during polling Kafka records to do offset validation check. ### Why are the changes needed? We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka. It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This should pass existing unit tests. This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range. Closes #31275 from viirya/SPARK-34187. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../kafka010/consumer/FetchedDataPool.scala | 5 ++-- .../kafka010/consumer/KafkaDataConsumer.scala | 30 +++++++++++++------ .../consumer/FetchedDataPoolSuite.scala | 12 ++++---- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala index 6174bfb203429..3e6831770a674 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} -import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} /** @@ -174,7 +174,8 @@ private[consumer] object FetchedDataPool { val emptyData = FetchedData( ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], UNKNOWN_OFFSET, - UNKNOWN_OFFSET) + UNKNOWN_OFFSET, + AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET)) CachedFetchedData(emptyData) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 54f6f91828055..5c92d110a630f 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -71,7 +71,7 @@ private[kafka010] class InternalKafkaConsumer( * consumer polls nothing before timeout. */ def fetch(offset: Long, pollTimeoutMs: Long): - (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { + (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = { // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) @@ -80,7 +80,8 @@ private[kafka010] class InternalKafkaConsumer( logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") val offsetAfterPoll = consumer.position(topicPartition) logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") - val fetchedData = (r, offsetAfterPoll) + val range = getAvailableOffsetRange() + val fetchedData = (r, offsetAfterPoll, range) if (r.isEmpty) { // We cannot fetch anything after `poll`. Two possible cases: // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will @@ -88,7 +89,6 @@ private[kafka010] class InternalKafkaConsumer( // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. // - Fetched something but all of them are not invisible. This is a valid case and let the // caller handles this. - val range = getAvailableOffsetRange() if (offset < range.earliest || offset >= range.latest) { throw new OffsetOutOfRangeException( Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) @@ -142,18 +142,22 @@ private[kafka010] class InternalKafkaConsumer( * should check if the pre-fetched data is still valid. * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to * poll when `records` is drained. + * @param _availableOffsetRange the available offset range in Kafka when polling the records. */ private[consumer] case class FetchedData( private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], private var _nextOffsetInFetchedData: Long, - private var _offsetAfterPoll: Long) { + private var _offsetAfterPoll: Long, + private var _availableOffsetRange: AvailableOffsetRange) { def withNewPoll( records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], - offsetAfterPoll: Long): FetchedData = { + offsetAfterPoll: Long, + availableOffsetRange: AvailableOffsetRange): FetchedData = { this._records = records this._nextOffsetInFetchedData = UNKNOWN_OFFSET this._offsetAfterPoll = offsetAfterPoll + this._availableOffsetRange = availableOffsetRange this } @@ -180,6 +184,7 @@ private[consumer] case class FetchedData( _records = ju.Collections.emptyListIterator() _nextOffsetInFetchedData = UNKNOWN_OFFSET _offsetAfterPoll = UNKNOWN_OFFSET + _availableOffsetRange = AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET) } /** @@ -192,6 +197,13 @@ private[consumer] case class FetchedData( * Returns the next offset to poll after draining the pre-fetched records. */ def offsetAfterPoll: Long = _offsetAfterPoll + + /** + * Returns the tuple of earliest and latest offsets that is the available offset range when + * polling the records. + */ + def availableOffsetRange: (Long, Long) = + (_availableOffsetRange.earliest, _availableOffsetRange.latest) } /** @@ -474,8 +486,8 @@ private[kafka010] class KafkaDataConsumer( // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { - val range = consumer.getAvailableOffsetRange() - if (range.earliest <= offset) { + val (earliestOffset, _) = fetchedData.availableOffsetRange + if (earliestOffset <= offset) { // `offset` is still valid but the corresponding message is invisible. We should skip it // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of // `fetchRecord` can just return `record` directly. @@ -524,8 +536,8 @@ private[kafka010] class KafkaDataConsumer( fetchedData: FetchedData, offset: Long, pollTimeoutMs: Long): Unit = { - val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs) - fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) + val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs) + fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range) } private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala index 23bab5cd48083..09d50ef0660cf 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPoolSuite.scala @@ -30,7 +30,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT} -import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ManualClock @@ -69,7 +69,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester { assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10) dataList.map { case (_, data) => - data.withNewPoll(testRecords(0, 5).listIterator, 5) + data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4)) } dataList.foreach { case (key, data) => @@ -91,7 +91,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester { val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) val data = dataPool.acquire(cacheKey, 0) - data.withNewPoll(testRecords(0, 5).listIterator, 5) + data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4)) (0 to 3).foreach { _ => data.next() } @@ -130,14 +130,14 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester { assert(getCache(dataPool)(cacheKey)(1).inUse) // reading from task 1 - dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5) + dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4)) (0 to 3).foreach { _ => dataFromTask1.next() } dataPool.release(cacheKey, dataFromTask1) // reading from task 2 - dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30) + dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30, AvailableOffsetRange(0, 29)) (0 to 5).foreach { _ => dataFromTask2.next() } @@ -189,7 +189,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester { assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10) dataList.map { case (_, data) => - data.withNewPoll(testRecords(0, 5).listIterator, 5) + data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4)) } val dataToEvict = dataList.take(3)