From d8e636025aea00a9ae5380492e7831fcc26fb440 Mon Sep 17 00:00:00 2001 From: Zhang Zhichao <441586683@qq.com> Date: Sat, 29 Sep 2018 00:17:29 +0800 Subject: [PATCH] [EXT][SPARK-21168] KafkaRDD should always set kafka clientId. #19887 https://issues.apache.org/jira/browse/SPARK-21168 There are no a number of other places that a client ID should be set,and I think we should use consumer.clientId in the clientId method,because the fetch request will be used by the same consumer behind. --- .../main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala index 5ea52b6ad36a0..791cf0efaf888 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala @@ -191,6 +191,7 @@ class KafkaRDD[ private def fetchBatch: Iterator[MessageAndOffset] = { val req = new FetchRequestBuilder() + .clientId(consumer.clientId) .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() val resp = consumer.fetch(req)