Skip to content

Commit

Permalink
[MAPR-32290] Spark processing offsets when messages are already ttl i…
Browse files Browse the repository at this point in the history
…n first batch (apache#368)

* [MAPR-32290] Spark processing offsets when messages are already ttl in first batch
  • Loading branch information
ekrivokonmapr committed Nov 7, 2018
1 parent fddc84f commit b282a8b
Showing 1 changed file with 30 additions and 5 deletions.
Expand Up @@ -17,18 +17,16 @@

package org.apache.spark.streaming.kafka09

import java.{ util => ju }
import java.{util => ju}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, Time}
Expand Down Expand Up @@ -74,6 +72,14 @@ private[spark] class DirectKafkaInputDStream[K, V](
kc
}

def consumerForAssign(): KafkaConsumer[Long, String] = this.synchronized {
val properties = consumerStrategy.executorKafkaParams
properties.put("max.poll.records", "1")
properties.put(ConsumerConfig.GROUP_ID_CONFIG,
s"${properties.get(ConsumerConfig.GROUP_ID_CONFIG)}_assignGroup")
new KafkaConsumer[Long, String](properties)
}

override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
logError("Kafka ConsumerRecord is not serializable. " +
"Use .map to extract fields before calling .persist or .window")
Expand Down Expand Up @@ -240,10 +246,29 @@ private[spark] class DirectKafkaInputDStream[K, V](

override def start(): Unit = {
val c = consumer
val consumerAssign = consumerForAssign
val pollTimeout = ssc.sparkContext.getConf
.getLong("spark.streaming.kafka.consumer.driver.poll.ms", 120000)
paranoidPoll(c)
if (currentOffsets.isEmpty) {
currentOffsets = c.assignment().asScala.map { tp =>
tp -> c.position(tp)
tp -> {
val position = c.position(tp)

consumerAssign.assign(ju.Arrays.asList(tp))
val records = consumerAssign.poll(pollTimeout).iterator()
val firstRecordOffset = if (records.hasNext) {
records.next().offset()
} else {
c.endOffsets(ju.Arrays.asList(tp)).get(tp).longValue()
}

if (position < firstRecordOffset) {
firstRecordOffset
} else {
position
}
}
}.toMap
}

Expand Down

0 comments on commit b282a8b

Please sign in to comment.