Skip to content

Commit

Permalink
[SPARK-34187][SS] Use available offset range obtained during polling …
Browse files Browse the repository at this point in the history
…when checking offset validation

### What changes were proposed in this pull request?

This patch uses the available offset range obtained during polling Kafka records to do offset validation check.

### Why are the changes needed?

We support non-consecutive offsets for Kafka since 2.4.0. In `fetchRecord`, we do offset validation by checking if the offset is in available offset range. But currently we obtain latest available offset range to do the check. It looks not correct as the available offset range could be changed during the batch, so the available offset range is different than the one when we polling the records from Kafka.

It is possible that an offset is valid when polling, but at the time we do the above check, it is out of latest available offset range. We will wrongly consider it as data loss case and fail the query or drop the record.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This should pass existing unit tests.

This is hard to have unit test as the Kafka producer and the consumer is asynchronous. Further, we also need to make the offset out of new available offset range.

Closes apache#31275 from viirya/SPARK-34187.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
viirya authored and skestle committed Feb 3, 2021
1 parent faa623a commit d909056
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
Expand Up @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
Expand Down Expand Up @@ -174,7 +174,8 @@ private[consumer] object FetchedDataPool {
val emptyData = FetchedData(
ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
UNKNOWN_OFFSET,
UNKNOWN_OFFSET)
UNKNOWN_OFFSET,
AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET))

CachedFetchedData(emptyData)
}
Expand Down
Expand Up @@ -71,7 +71,7 @@ private[kafka010] class InternalKafkaConsumer(
* consumer polls nothing before timeout.
*/
def fetch(offset: Long, pollTimeoutMs: Long):
(ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
(ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = {

// Seek to the offset because we may call seekToBeginning or seekToEnd before this.
seek(offset)
Expand All @@ -80,15 +80,15 @@ private[kafka010] class InternalKafkaConsumer(
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
val offsetAfterPoll = consumer.position(topicPartition)
logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
val fetchedData = (r, offsetAfterPoll)
val range = getAvailableOffsetRange()
val fetchedData = (r, offsetAfterPoll, range)
if (r.isEmpty) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be thrown.
// - Fetched something but all of them are not invisible. This is a valid case and let the
// caller handles this.
val range = getAvailableOffsetRange()
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
Expand Down Expand Up @@ -142,18 +142,22 @@ private[kafka010] class InternalKafkaConsumer(
* should check if the pre-fetched data is still valid.
* @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to
* poll when `records` is drained.
* @param _availableOffsetRange the available offset range in Kafka when polling the records.
*/
private[consumer] case class FetchedData(
private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
private var _nextOffsetInFetchedData: Long,
private var _offsetAfterPoll: Long) {
private var _offsetAfterPoll: Long,
private var _availableOffsetRange: AvailableOffsetRange) {

def withNewPoll(
records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
offsetAfterPoll: Long): FetchedData = {
offsetAfterPoll: Long,
availableOffsetRange: AvailableOffsetRange): FetchedData = {
this._records = records
this._nextOffsetInFetchedData = UNKNOWN_OFFSET
this._offsetAfterPoll = offsetAfterPoll
this._availableOffsetRange = availableOffsetRange
this
}

Expand All @@ -180,6 +184,7 @@ private[consumer] case class FetchedData(
_records = ju.Collections.emptyListIterator()
_nextOffsetInFetchedData = UNKNOWN_OFFSET
_offsetAfterPoll = UNKNOWN_OFFSET
_availableOffsetRange = AvailableOffsetRange(UNKNOWN_OFFSET, UNKNOWN_OFFSET)
}

/**
Expand All @@ -192,6 +197,13 @@ private[consumer] case class FetchedData(
* Returns the next offset to poll after draining the pre-fetched records.
*/
def offsetAfterPoll: Long = _offsetAfterPoll

/**
* Returns the tuple of earliest and latest offsets that is the available offset range when
* polling the records.
*/
def availableOffsetRange: (Long, Long) =
(_availableOffsetRange.earliest, _availableOffsetRange.latest)
}

/**
Expand Down Expand Up @@ -474,8 +486,8 @@ private[kafka010] class KafkaDataConsumer(
// In general, Kafka uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.offset > offset) {
val range = consumer.getAvailableOffsetRange()
if (range.earliest <= offset) {
val (earliestOffset, _) = fetchedData.availableOffsetRange
if (earliestOffset <= offset) {
// `offset` is still valid but the corresponding message is invisible. We should skip it
// and jump to `record.offset`. Here we move `fetchedData` back so that the next call of
// `fetchRecord` can just return `record` directly.
Expand Down Expand Up @@ -524,8 +536,8 @@ private[kafka010] class KafkaDataConsumer(
fetchedData: FetchedData,
offset: Long,
pollTimeoutMs: Long): Unit = {
val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
val (records, offsetAfterPoll, range) = consumer.fetch(offset, pollTimeoutMs)
fetchedData.withNewPoll(records.listIterator, offsetAfterPoll, range)
}

private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
Expand Down
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.PrivateMethodTester

import org.apache.spark.SparkConf
import org.apache.spark.sql.kafka010.{FETCHED_DATA_CACHE_EVICTOR_THREAD_RUN_INTERVAL, FETCHED_DATA_CACHE_TIMEOUT}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, CacheKey}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ManualClock

Expand Down Expand Up @@ -69,7 +69,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)

dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
}

dataList.foreach { case (key, data) =>
Expand All @@ -91,7 +91,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))

val data = dataPool.acquire(cacheKey, 0)
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))

(0 to 3).foreach { _ => data.next() }

Expand Down Expand Up @@ -130,14 +130,14 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assert(getCache(dataPool)(cacheKey)(1).inUse)

// reading from task 1
dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5)
dataFromTask1.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))

(0 to 3).foreach { _ => dataFromTask1.next() }

dataPool.release(cacheKey, dataFromTask1)

// reading from task 2
dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30)
dataFromTask2.withNewPoll(testRecords(0, 30).listIterator, 30, AvailableOffsetRange(0, 29))

(0 to 5).foreach { _ => dataFromTask2.next() }

Expand Down Expand Up @@ -189,7 +189,7 @@ class FetchedDataPoolSuite extends SharedSparkSession with PrivateMethodTester {
assertFetchedDataPoolStatistic(dataPool, expectedNumCreated = 10, expectedNumTotal = 10)

dataList.map { case (_, data) =>
data.withNewPoll(testRecords(0, 5).listIterator, 5)
data.withNewPoll(testRecords(0, 5).listIterator, 5, AvailableOffsetRange(0, 4))
}

val dataToEvict = dataList.take(3)
Expand Down

0 comments on commit d909056

Please sign in to comment.