Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka 0.10.0.1? #131

Open
jinhaicheng0107 opened this issue Apr 18, 2018 · 1 comment
Open

Support Kafka 0.10.0.1? #131

jinhaicheng0107 opened this issue Apr 18, 2018 · 1 comment

Comments

@jinhaicheng0107
Copy link

My source Kafka and destination Kafa version are both 0.10..0.1, I use uReplicator-master version, however it comes following exceptions.

[2018-04-18 15:14:43,004] INFO [CompactConsumerFetcherManager-1524033744239] Fetcher Thread for topic partitions: ArrayBuffer([[topicjhc,0], InitialOffset 0] ) is CompactConsumerFetcherThread-kloak-mirrormaker-test_kloakmms01-sjc1-0-0 (kafka.mirrormaker.CompactConsumerFetcherManager:70)
[2018-04-18 15:14:43,005] INFO [CompactConsumerFetcherManager-1524033744239] Added fetcher for partitions ArrayBuffer([[topicjhc,0], initOffset 0 to broker BrokerEndPoint(0,99.12.92.1,9092)] ) (kafka.mirrormaker.CompactConsumerFetcherManager:70)
[2018-04-18 15:14:43,048] INFO Reconnect due to error: (kafka.consumer.SimpleConsumer:78)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
at kafka.mirrormaker.CompactConsumerFetcherThread.processFetchRequest(CompactConsumerFetcherThread.scala:209)
at kafka.mirrormaker.CompactConsumerFetcherThread.doWork(CompactConsumerFetcherThread.scala:192)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
at kafka.mirrormaker.MirrorMakerWorker$MirrorMakerThread.run(MirrorMakerWorker.scala:288)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
at kafka.mirrormaker.MirrorMakerWorker$MirrorMakerThread.run(MirrorMakerWorker.scala:288)

source Kafka broker config:

num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.retention.bytes=4294967296
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000
auto.create.topics.enable=false
delete.topic.enable=true
message.max.bytes=1048576
offsets.retention.minutes=10080
num.replica.fetchers=2
broker.id=1
zookeeper.connect=99.12.92.4:2181,99.12.92.5:2181,99.12.92.6:2181
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:admin
listeners=PLAINTEXT://99.12.92.2:9092,SASL_PLAINTEXT://99.12.92.2:9093

uReplicator consumer config:

zookeeper.connect=99.12.92.4:2181
zookeeper.connection.timeout.ms=30000
zookeeper.session.timeout.ms=30000
group.id=kloak-mirrormaker-test
consumer.id=kloakmms01-sjc1
partition.assignment.strategy=roundrobin
socket.receive.buffer.bytes=1048576
fetch.message.max.bytes=8388608
queued.max.message.chunks=5
auto.offset.reset=smallest

@xhl1988
Copy link
Contributor

xhl1988 commented May 23, 2018

Synced with @jinhaicheng0107 through email. He did an awesome job comparing different versions:

uReplicator version uReplicator-kafka-0.8.2.1 uReplicator-master uReplicator-master uReplicator-master
kafka version kafka-0.10.0.1 kafka-0.10.0.1 kafka-0.10.0.1(without security configs) kafka-0.10.2.1
result running error is as before error is as before running

Looks like SimpleConsumer@0.10.2.1 doesn't work with Kafka@0.10.0.1 through KafkaConsumer should work. So the final solution is to downgrade Kafka version in uReplicator to 0.10.0.1 and it worked well after that. The diff to downgrade is as below:

diff --git a/pom.xml b/pom.xml
index d28f9a2..13030cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@
   <properties>
     <jdk.version>1.8</jdk.version>
     <scala.version>2.11.8</scala.version>
-    <kafka.version>0.10.2.1</kafka.version>
+    <kafka.version>0.10.0.1</kafka.version>
     <kafka.scala.version>2.11</kafka.scala.version>
     <helix.version>0.6.8</helix.version>
     <yammer.version>2.2.0</yammer.version>
diff --git a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java
index 8468ff0..2ca540b 100644
--- a/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java
+++ b/uReplicator-Controller/src/test/java/com/uber/stream/kafka/mirrormaker/controller/utils/KafkaStarterUtils.java
@@ -23,7 +23,6 @@ import kafka.server.KafkaServerStartable;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
-import org.apache.kafka.common.errors.TopicExistsException;
 
 
 /**
@@ -104,7 +103,7 @@ public class KafkaStarterUtils {
       ZkUtils zkUtils = ZkUtils.apply(zkStr, 30000, 30000, false);
       TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args);
       TopicCommand.createTopic(zkUtils, opts);
-    } catch (TopicExistsException e) {
+    } catch (Exception e) {
       // Catch TopicExistsException otherwise it will break maven-surefire-plugin
       System.out.println("Topic already existed");
     }
diff --git a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala
index bb54479..691e492 100644
--- a/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala
+++ b/uReplicator-Worker/src/main/scala/kafka/mirrormaker/CompactConsumerFetcherManager.scala
@@ -29,6 +29,7 @@ import kafka.server.{BrokerAndFetcherId, BrokerAndInitialOffset}
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, ShutdownableThread, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable.HashMap
@@ -310,7 +311,7 @@ class CompactConsumerFetcherManager(private val consumerIdString: String,
         }
 
         info("Partitions without leader %s".format(noLeaderPartitionSet))
-        val brokers = ClientUtils.getPlaintextBrokerEndPoints(ZkUtils.apply(zkClient, true))
+        val brokers = ZkUtils.apply(zkClient, true).getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
 
         val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
           brokers,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants