Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #684 from zalando/feature/aruha-876
Browse files Browse the repository at this point in the history
aruha-876: kafka client 10.1
  • Loading branch information
adyach committed Aug 1, 2017
2 parents 50ef88a + 647d7eb commit 00a73a7
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 23 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ dependencies {
compile 'org.echocat.jomon:runtime:1.6.3'

// kafka & zookeeper
compile 'org.apache.kafka:kafka-clients:0.9.0.1'
compile 'org.apache.kafka:kafka_2.11:0.9.0.1'
compile 'org.apache.kafka:kafka-clients:0.10.1.0'
compile 'org.apache.kafka:kafka_2.11:0.10.1.0'
compile 'org.apache.curator:curator-framework:2.12.0'
compile 'org.apache.curator:curator-recipes:2.12.0'

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ services:
- "2181:2181"

kafka:
image: wurstmeister/kafka:0.9.0.0
image: wurstmeister/kafka:0.10.1.0
network_mode: "host"
ports:
- "9092:9092"
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Fri Feb 24 13:27:52 CET 2017
#Tue Jun 20 11:33:10 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14.1-all.zip
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi.repository.kafka;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -87,7 +88,7 @@ public List<Cursor> getNextOffsets(final String topic) {
.collect(Collectors.toList());

consumer.assign(partitions);
consumer.seekToEnd(partitions.toArray(new TopicPartition[partitions.size()]));
consumer.seekToEnd(partitions);

return partitions
.stream()
Expand All @@ -100,7 +101,7 @@ public void createTopic(final String topic, final String zkUrl) {
ZkUtils zkUtils = null;
try {
zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false);
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
}
finally {
if (zkUtils != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.common.base.Preconditions;
import kafka.admin.AdminUtils;
import kafka.common.TopicExistsException;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -13,6 +13,7 @@
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
Expand Down Expand Up @@ -43,7 +44,7 @@
import org.zalando.nakadi.util.UUIDGenerator;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -130,7 +131,8 @@ private void createTopic(final String topic, final int partitionsNum, final int
final Properties topicConfig = new Properties();
topicConfig.setProperty("retention.ms", Long.toString(retentionMs));
topicConfig.setProperty("segment.ms", Long.toString(rotationMs));
AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig);
AdminUtils.createTopic(zkUtils, topic, partitionsNum, replicaFactor, topicConfig,
RackAwareMode.Enforced$.MODULE$);
});
} catch (final TopicExistsException e) {
throw new TopicCreationException("Topic with name " + topic +
Expand Down Expand Up @@ -308,20 +310,19 @@ private void failUnpublished(final List<BatchItem> batch, final String reason) {
public Optional<PartitionStatistics> loadPartitionStatistics(final Timeline timeline, final String partition)
throws ServiceUnavailableException {
try (Consumer<byte[], byte[]> consumer = kafkaFactory.getConsumer()) {
final List<PartitionInfo> topicPartitions = consumer.partitionsFor(timeline.getTopic());

final Optional<PartitionInfo> tp = topicPartitions.stream()
final Optional<PartitionInfo> tp = consumer.partitionsFor(timeline.getTopic()).stream()
.filter(p -> KafkaCursor.toNakadiPartition(p.partition()).equals(partition))
.findAny();
if (!tp.isPresent()) {
return Optional.empty();
}
final TopicPartition kafkaTP = tp.map(v -> new TopicPartition(v.topic(), v.partition())).get();
final Collection<TopicPartition> topicPartitions = Collections.singletonList(kafkaTP);
consumer.assign(Collections.singletonList(kafkaTP));
consumer.seekToBeginning(kafkaTP);
consumer.seekToBeginning(topicPartitions);

final long begin = consumer.position(kafkaTP);
consumer.seekToEnd(kafkaTP);
consumer.seekToEnd(topicPartitions);
final long end = consumer.position(kafkaTP);

return Optional.of(new KafkaPartitionStatistics(timeline, kafkaTP.partition(), begin, end - 1));
Expand All @@ -342,18 +343,18 @@ public List<PartitionStatistics> loadTopicStatistics(final Collection<Timeline>
.map(p -> new TopicPartition(p.topic(), p.partition()))
.forEach(tp -> backMap.put(tp, timeline));
}
final TopicPartition[] kafkaTPs = backMap.keySet().toArray(new TopicPartition[backMap.size()]);
consumer.assign(Arrays.asList(kafkaTPs));
final List<TopicPartition> kafkaTPs = new ArrayList<>(backMap.keySet());
consumer.assign(kafkaTPs);
consumer.seekToBeginning(kafkaTPs);
final long[] begins = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray();
final long[] begins = kafkaTPs.stream().mapToLong(consumer::position).toArray();

consumer.seekToEnd(kafkaTPs);
final long[] ends = Stream.of(kafkaTPs).mapToLong(consumer::position).toArray();
final long[] ends = kafkaTPs.stream().mapToLong(consumer::position).toArray();

return IntStream.range(0, kafkaTPs.length)
return IntStream.range(0, kafkaTPs.size())
.mapToObj(i -> new KafkaPartitionStatistics(
backMap.get(kafkaTPs[i]),
kafkaTPs[i].partition(),
backMap.get(kafkaTPs.get(i)),
kafkaTPs.get(i).partition(),
begins[i],
ends[i] - 1))
.collect(toList());
Expand All @@ -375,7 +376,7 @@ public List<PartitionEndStatistics> loadTopicEndStatistics(final Collection<Time
}
final List<TopicPartition> kafkaTPs = newArrayList(backMap.keySet());
consumer.assign(kafkaTPs);
consumer.seekToEnd(kafkaTPs.toArray(new TopicPartition[kafkaTPs.size()]));
consumer.seekToEnd(kafkaTPs);
return backMap.entrySet().stream()
.map(e -> {
final TopicPartition tp = e.getKey();
Expand Down

0 comments on commit 00a73a7

Please sign in to comment.