Skip to content

Commit

Permalink
Revert "Revert "Update Kafka Clients to 2.3.0""
Browse files Browse the repository at this point in the history
  • Loading branch information
ferbncode committed Feb 24, 2020
1 parent 2218cbd commit e129547
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 67 deletions.
1 change: 1 addition & 0 deletions acceptance-test/build.gradle
Expand Up @@ -83,6 +83,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand Down
Expand Up @@ -46,6 +46,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int DEFAULT_COMMIT_TIMEOUT = 60;
private static final int ZK_SESSION_TIMEOUT = 30000;
private static final int ZK_CONNECTION_TIMEOUT = 10000;
private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000;
private static final int NAKADI_SEND_TIMEOUT = 10000;
private static final int NAKADI_POLL_TIMEOUT = 10000;
private static final Long DEFAULT_RETENTION_TIME = 100L;
Expand Down Expand Up @@ -101,7 +102,7 @@ public void setup() {
kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE,
KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZK_MAX_IN_FLIGHT_REQUESTS);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY,
Optional.of(DEFAULT_RETENTION_TIME));
Expand Down
@@ -1,14 +1,16 @@
package org.zalando.nakadi.repository.kafka;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.zalando.nakadi.view.Cursor;

import java.util.List;
Expand Down Expand Up @@ -98,17 +100,29 @@ public List<Cursor> getNextOffsets(final String topic) {
}

public void createTopic(final String topic, final String zkUrl) {
ZkUtils zkUtils = null;
try {
zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false);
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Safe$.MODULE$);
} finally {
if (zkUtils != null) {
zkUtils.close();
}
try (KafkaZkClient zkClient = createZkClient(zkUrl)) {
final AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.createTopic(topic, 1, 1,
new Properties(), RackAwareMode.Safe$.MODULE$);
}
}

private static KafkaZkClient createZkClient(final String zkUrl) {
return new KafkaZkClient(
new ZooKeeperClient(
zkUrl,
30000,
10000,
1000,
Time.SYSTEM,
"dummyMetricGroup",
"dummyMetricType"
),
false,
Time.SYSTEM
);
}

public static Long getTopicRetentionTime(final String topic, final String zkPath) {
return Long.valueOf(getTopicProperty(topic, zkPath, "retention.ms"));
}
Expand All @@ -118,8 +132,10 @@ public static String getTopicCleanupPolicy(final String topic, final String zkPa
}

public static String getTopicProperty(final String topic, final String zkPath, final String propertyName) {
final ZkUtils zkUtils = ZkUtils.apply(zkPath, 30000, 10000, false);
final Properties topicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
return topicConfig.getProperty(propertyName);
try (KafkaZkClient zkClient = createZkClient(zkPath)) {
final AdminZkClient adminZkClient = new AdminZkClient(zkClient);
final Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
return topicConfig.getProperty(propertyName);
}
}
}
5 changes: 3 additions & 2 deletions api-consumption/build.gradle
Expand Up @@ -71,6 +71,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand All @@ -81,8 +82,8 @@ dependencies {
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'io.opentracing:opentracing-util:0.31.0'

compile 'org.apache.kafka:kafka-clients:2.1.0'
compile('org.apache.kafka:kafka_2.12:2.1.0') {
compile "org.apache.kafka:kafka-clients:$kafkaClientVersion"
compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") {
exclude module: "zookeeper"
}
compile("org.apache.curator:curator-recipes:$curatorVersion") {
Expand Down
1 change: 1 addition & 0 deletions api-cursors/build.gradle
Expand Up @@ -69,6 +69,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand Down
1 change: 1 addition & 0 deletions api-misc/build.gradle
Expand Up @@ -68,6 +68,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand Down
6 changes: 1 addition & 5 deletions app/build.gradle
Expand Up @@ -103,11 +103,6 @@ dependencies {
compile 'org.zalando:nakadi-plugin-api:3.2.1'
compile 'org.echocat.jomon:runtime:1.6.3'

// kafka & zookeeper
compile 'org.apache.kafka:kafka-clients:2.1.0'
compile('org.apache.kafka:kafka_2.12:2.1.0') {
exclude module: "zookeeper"
}
compile("org.apache.curator:curator-recipes:$curatorVersion") {
exclude module: "zookeeper"
}
Expand All @@ -127,6 +122,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:twintip-spring-web:1.1.0'
compile 'org.json:json:20180130'

Expand Down
2 changes: 2 additions & 0 deletions app/src/main/resources/application.yml
Expand Up @@ -80,6 +80,8 @@ nakadi:
connectionString: zookeeper://zookeeper:2181
sessionTimeoutMs: 10000
connectionTimeoutMs: 3000
maxInFlightRequests: 1000

oauth2:
mode: BASIC
adminClientId: adminClientId
Expand Down
5 changes: 5 additions & 0 deletions build.gradle
Expand Up @@ -49,6 +49,11 @@ subprojects {
dependsOn test
finalizedBy jacocoTestReport
}
dependencies {
ext {
kafkaClientVersion = '2.3.1'
}
}
}

task testInitNakadi(type: Exec) {
Expand Down
6 changes: 4 additions & 2 deletions core-common/build.gradle
Expand Up @@ -67,6 +67,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand All @@ -77,10 +78,11 @@ dependencies {
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'io.opentracing:opentracing-util:0.31.0'

compile 'org.apache.kafka:kafka-clients:2.1.0'
compile('org.apache.kafka:kafka_2.12:2.1.0') {
compile "org.apache.kafka:kafka-clients:$kafkaClientVersion"
compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") {
exclude module: "zookeeper"
}

compile("org.apache.curator:curator-recipes:$curatorVersion") {
exclude module: "zookeeper"
}
Expand Down
Expand Up @@ -3,9 +3,10 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import kafka.admin.AdminUtils;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -19,6 +20,7 @@
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.utils.Time;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedCountStrategy;
import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy;
import org.echocat.jomon.runtime.concurrent.Retryer;
Expand Down Expand Up @@ -208,15 +210,15 @@ public void repartition(final String topic, final int partitionsNumber) throws C
public String createTopic(final NakadiTopicConfig nakadiTopicConfig) throws TopicCreationException {

final KafkaTopicConfig kafkaTopicConfig = kafkaTopicConfigFactory.createKafkaTopicConfig(nakadiTopicConfig);
try {
doWithZkUtils(zkUtils -> {
AdminUtils.createTopic(zkUtils,
kafkaTopicConfig.getTopicName(),
kafkaTopicConfig.getPartitionCount(),
kafkaTopicConfig.getReplicaFactor(),
kafkaTopicConfigFactory.createKafkaTopicLevelProperties(kafkaTopicConfig),
kafkaTopicConfig.getRackAwareMode());
});
try (KafkaZkClient client = createZkClient()) {
final AdminZkClient adminZkClient = new AdminZkClient(client);
adminZkClient.createTopic(
kafkaTopicConfig.getTopicName(),
kafkaTopicConfig.getPartitionCount(),
kafkaTopicConfig.getReplicaFactor(),
kafkaTopicConfigFactory.createKafkaTopicLevelProperties(kafkaTopicConfig),
kafkaTopicConfig.getRackAwareMode()
);
} catch (final TopicExistsException e) {
throw new TopicCreationException("Topic with name " + kafkaTopicConfig.getTopicName() +
" already exists (or wasn't completely removed yet)", e);
Expand Down Expand Up @@ -244,9 +246,10 @@ public String createTopic(final NakadiTopicConfig nakadiTopicConfig) throws Topi

@Override
public void deleteTopic(final String topic) throws TopicDeletionException {
try {
try (KafkaZkClient zkClient = createZkClient()) {
// this will only trigger topic deletion, but the actual deletion is asynchronous
doWithZkUtils(zkUtils -> AdminUtils.deleteTopic(zkUtils, topic));
final AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);
} catch (final Exception e) {
throw new TopicDeletionException("Unable to delete topic " + topic, e);
}
Expand Down Expand Up @@ -616,12 +619,11 @@ private Map<NakadiCursor, KafkaCursor> convertToKafkaCursors(final List<NakadiCu

@Override
public void setRetentionTime(final String topic, final Long retentionMs) throws TopicConfigException {
try {
doWithZkUtils(zkUtils -> {
final Properties topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
topicProps.setProperty("retention.ms", Long.toString(retentionMs));
AdminUtils.changeTopicConfig(zkUtils, topic, topicProps);
});
try (KafkaZkClient zkClient = createZkClient()) {
final AdminZkClient adminZkClient = new AdminZkClient(zkClient);
final Properties topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
topicProps.setProperty("retention.ms", Long.toString(retentionMs));
adminZkClient.changeTopicConfig(topic, topicProps);
} catch (final Exception e) {
throw new TopicConfigException("Unable to update retention time for topic " + topic, e);
}
Expand All @@ -636,24 +638,20 @@ private void validateCursorForNulls(final NakadiCursor cursor) throws InvalidCur
}
}

private void doWithZkUtils(final ZkUtilsAction action) throws Exception {
ZkUtils zkUtils = null;
try {
zkUtils = ZkUtils.apply(
kafkaZookeeper.getZookeeperConnectionString(),
zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs(),
false);
action.execute(zkUtils);
} finally {
if (zkUtils != null) {
zkUtils.close();
}
}
}

@FunctionalInterface
private interface ZkUtilsAction {
void execute(ZkUtils zkUtils) throws Exception;
private KafkaZkClient createZkClient() {
// The calling method should make sure to close connection
return new KafkaZkClient(
new ZooKeeperClient(
kafkaZookeeper.getZookeeperConnectionString(),
zookeeperSettings.getZkSessionTimeoutMs(),
zookeeperSettings.getZkConnectionTimeoutMs(),
zookeeperSettings.getMaxInFlightRequests(),
Time.SYSTEM,
ZookeeperSettings.METRIC_GROUP,
ZookeeperSettings.METRIC_TYPE
),
false,
Time.SYSTEM
);
}
}
Expand Up @@ -9,12 +9,18 @@ public class ZookeeperSettings {

private final int zkSessionTimeoutMs;
private final int zkConnectionTimeoutMs;
private final int maxInFlightRequests;
public static final String METRIC_GROUP = "kafka.server";
public static final String METRIC_TYPE = "kakfaZookeeper";


@Autowired
public ZookeeperSettings(@Value("${nakadi.zookeeper.sessionTimeoutMs}") final int zkSessionTimeoutMs,
@Value("${nakadi.zookeeper.connectionTimeoutMs}")final int zkConnectionTimeoutMs) {
@Value("${nakadi.zookeeper.connectionTimeoutMs}")final int zkConnectionTimeoutMs,
@Value("${nakadi.zookeeper.maxInFlightRequests}") final int maxInFlightRequests) {
this.zkSessionTimeoutMs = zkSessionTimeoutMs;
this.zkConnectionTimeoutMs = zkConnectionTimeoutMs;
this.maxInFlightRequests = maxInFlightRequests;
}

public int getZkSessionTimeoutMs() {
Expand All @@ -24,4 +30,8 @@ public int getZkSessionTimeoutMs() {
public int getZkConnectionTimeoutMs() {
return zkConnectionTimeoutMs;
}

public int getMaxInFlightRequests() {
return maxInFlightRequests;
}
}
5 changes: 3 additions & 2 deletions core-metastore/build.gradle
Expand Up @@ -69,6 +69,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand All @@ -79,8 +80,8 @@ dependencies {
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'io.opentracing:opentracing-util:0.31.0'

compile 'org.apache.kafka:kafka-clients:2.1.0'
compile('org.apache.kafka:kafka_2.12:2.1.0') {
compile "org.apache.kafka:kafka-clients:$kafkaClientVersion"
compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") {
exclude module: "zookeeper"
}
compile("org.apache.curator:curator-recipes:$curatorVersion") {
Expand Down
5 changes: 3 additions & 2 deletions core-services/build.gradle
Expand Up @@ -70,6 +70,7 @@ dependencies {
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion"
compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion"
compile 'org.zalando:jackson-datatype-problem:0.22.0'
compile 'org.zalando:problem:0.22.0'
compile 'org.json:json:20180130'
Expand All @@ -80,8 +81,8 @@ dependencies {
compile 'io.opentracing:opentracing-api:0.31.0'
compile 'io.opentracing:opentracing-util:0.31.0'

compile 'org.apache.kafka:kafka-clients:2.1.0'
compile('org.apache.kafka:kafka_2.12:2.1.0') {
compile "org.apache.kafka:kafka-clients:$kafkaClientVersion"
compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") {
exclude module: "zookeeper"
}
compile("org.apache.curator:curator-recipes:$curatorVersion") {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Expand Up @@ -43,7 +43,7 @@ services:
- "2181:2181"

kafka:
image: wurstmeister/kafka:2.11-1.1.1
image: wurstmeister/kafka:2.12-2.4.0
ports:
- "29092:29092"
- "9092:9092"
Expand Down

0 comments on commit e129547

Please sign in to comment.