From e129547cc2f565dcc7159273681b165160ac3f88 Mon Sep 17 00:00:00 2001 From: Suyash Garg Date: Mon, 24 Feb 2020 16:39:13 +0100 Subject: [PATCH] Revert "Revert "Update Kafka Clients to 2.3.0"" --- acceptance-test/build.gradle | 1 + .../repository/kafka/KafkaRepositoryAT.java | 3 +- .../repository/kafka/KafkaTestHelper.java | 42 +++++++---- api-consumption/build.gradle | 5 +- api-cursors/build.gradle | 1 + api-misc/build.gradle | 1 + app/build.gradle | 6 +- app/src/main/resources/application.yml | 2 + build.gradle | 5 ++ core-common/build.gradle | 6 +- .../kafka/KafkaTopicRepository.java | 74 +++++++++---------- .../zookeeper/ZookeeperSettings.java | 12 ++- core-metastore/build.gradle | 5 +- core-services/build.gradle | 5 +- docker-compose.yml | 2 +- 15 files changed, 103 insertions(+), 67 deletions(-) diff --git a/acceptance-test/build.gradle b/acceptance-test/build.gradle index 4afb461cb7..1f8b536fdd 100644 --- a/acceptance-test/build.gradle +++ b/acceptance-test/build.gradle @@ -83,6 +83,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index d1a670ca93..1a5e458d99 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -46,6 +46,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int DEFAULT_COMMIT_TIMEOUT = 60; private static final int ZK_SESSION_TIMEOUT = 30000; private static final int ZK_CONNECTION_TIMEOUT = 10000; + private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000; private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; private static final Long DEFAULT_RETENTION_TIME = 100L; @@ -101,7 +102,7 @@ public void setup() { kafkaSettings = new KafkaSettings(KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT); - zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT); + zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZK_MAX_IN_FLIGHT_REQUESTS); kafkaHelper = new KafkaTestHelper(KAFKA_URL); defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY, Optional.of(DEFAULT_RETENTION_TIME)); diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java index 37704d2072..394e040cda 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaTestHelper.java @@ -1,14 +1,16 @@ package org.zalando.nakadi.repository.kafka; -import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.ConfigType; -import kafka.utils.ZkUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import kafka.zookeeper.ZooKeeperClient; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; import org.zalando.nakadi.view.Cursor; import java.util.List; @@ -98,17 +100,29 @@ public List getNextOffsets(final String topic) { } public void createTopic(final String topic, final String zkUrl) { - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply(zkUrl, 30000, 10000, false); - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Safe$.MODULE$); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } + try (KafkaZkClient zkClient = createZkClient(zkUrl)) { + final AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.createTopic(topic, 1, 1, + new Properties(), RackAwareMode.Safe$.MODULE$); } } + private static KafkaZkClient createZkClient(final String zkUrl) { + return new KafkaZkClient( + new ZooKeeperClient( + zkUrl, + 30000, + 10000, + 1000, + Time.SYSTEM, + "dummyMetricGroup", + "dummyMetricType" + ), + false, + Time.SYSTEM + ); + } + public static Long getTopicRetentionTime(final String topic, final String zkPath) { return Long.valueOf(getTopicProperty(topic, zkPath, "retention.ms")); } @@ -118,8 +132,10 @@ public static String getTopicCleanupPolicy(final String topic, final String zkPa } public static String getTopicProperty(final String topic, final String zkPath, final String propertyName) { - final ZkUtils zkUtils = ZkUtils.apply(zkPath, 30000, 10000, false); - final Properties topicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); - return topicConfig.getProperty(propertyName); + try (KafkaZkClient zkClient = createZkClient(zkPath)) { + final AdminZkClient adminZkClient = new AdminZkClient(zkClient); + final Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + return topicConfig.getProperty(propertyName); + } } } diff --git a/api-consumption/build.gradle b/api-consumption/build.gradle index 1988a845c1..47f6147c14 100644 --- a/api-consumption/build.gradle +++ b/api-consumption/build.gradle @@ -71,6 +71,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' @@ -81,8 +82,8 @@ dependencies { compile 'io.opentracing:opentracing-api:0.31.0' compile 'io.opentracing:opentracing-util:0.31.0' - compile 'org.apache.kafka:kafka-clients:2.1.0' - compile('org.apache.kafka:kafka_2.12:2.1.0') { + compile "org.apache.kafka:kafka-clients:$kafkaClientVersion" + compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") { exclude module: "zookeeper" } compile("org.apache.curator:curator-recipes:$curatorVersion") { diff --git a/api-cursors/build.gradle b/api-cursors/build.gradle index 8d185a759f..cde6d71ae0 100644 --- a/api-cursors/build.gradle +++ b/api-cursors/build.gradle @@ -69,6 +69,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' diff --git a/api-misc/build.gradle b/api-misc/build.gradle index ce8c4c156d..73bad15970 100644 --- a/api-misc/build.gradle +++ b/api-misc/build.gradle @@ -68,6 +68,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' diff --git a/app/build.gradle b/app/build.gradle index 8200b7c9eb..655a1abd05 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -103,11 +103,6 @@ dependencies { compile 'org.zalando:nakadi-plugin-api:3.2.1' compile 'org.echocat.jomon:runtime:1.6.3' - // kafka & zookeeper - compile 'org.apache.kafka:kafka-clients:2.1.0' - compile('org.apache.kafka:kafka_2.12:2.1.0') { - exclude module: "zookeeper" - } compile("org.apache.curator:curator-recipes:$curatorVersion") { exclude module: "zookeeper" } @@ -127,6 +122,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:twintip-spring-web:1.1.0' compile 'org.json:json:20180130' diff --git a/app/src/main/resources/application.yml b/app/src/main/resources/application.yml index 301e7b55c7..d268ffbeb6 100644 --- a/app/src/main/resources/application.yml +++ b/app/src/main/resources/application.yml @@ -80,6 +80,8 @@ nakadi: connectionString: zookeeper://zookeeper:2181 sessionTimeoutMs: 10000 connectionTimeoutMs: 3000 + maxInFlightRequests: 1000 + oauth2: mode: BASIC adminClientId: adminClientId diff --git a/build.gradle b/build.gradle index 4c98ed9e7c..47da57cd6a 100644 --- a/build.gradle +++ b/build.gradle @@ -49,6 +49,11 @@ subprojects { dependsOn test finalizedBy jacocoTestReport } + dependencies { + ext { + kafkaClientVersion = '2.3.1' + } + } } task testInitNakadi(type: Exec) { diff --git a/core-common/build.gradle b/core-common/build.gradle index ca18a3f27a..bbc8ec17af 100644 --- a/core-common/build.gradle +++ b/core-common/build.gradle @@ -67,6 +67,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' @@ -77,10 +78,11 @@ dependencies { compile 'io.opentracing:opentracing-api:0.31.0' compile 'io.opentracing:opentracing-util:0.31.0' - compile 'org.apache.kafka:kafka-clients:2.1.0' - compile('org.apache.kafka:kafka_2.12:2.1.0') { + compile "org.apache.kafka:kafka-clients:$kafkaClientVersion" + compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") { exclude module: "zookeeper" } + compile("org.apache.curator:curator-recipes:$curatorVersion") { exclude module: "zookeeper" } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 441c8f6088..20eb7d0c45 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -3,9 +3,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import kafka.admin.AdminUtils; import kafka.server.ConfigType; -import kafka.utils.ZkUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import kafka.zookeeper.ZooKeeperClient; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.consumer.Consumer; @@ -19,6 +20,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.utils.Time; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedCountStrategy; import org.echocat.jomon.runtime.concurrent.RetryForSpecifiedTimeStrategy; import org.echocat.jomon.runtime.concurrent.Retryer; @@ -208,15 +210,15 @@ public void repartition(final String topic, final int partitionsNumber) throws C public String createTopic(final NakadiTopicConfig nakadiTopicConfig) throws TopicCreationException { final KafkaTopicConfig kafkaTopicConfig = kafkaTopicConfigFactory.createKafkaTopicConfig(nakadiTopicConfig); - try { - doWithZkUtils(zkUtils -> { - AdminUtils.createTopic(zkUtils, - kafkaTopicConfig.getTopicName(), - kafkaTopicConfig.getPartitionCount(), - kafkaTopicConfig.getReplicaFactor(), - kafkaTopicConfigFactory.createKafkaTopicLevelProperties(kafkaTopicConfig), - kafkaTopicConfig.getRackAwareMode()); - }); + try (KafkaZkClient client = createZkClient()) { + final AdminZkClient adminZkClient = new AdminZkClient(client); + adminZkClient.createTopic( + kafkaTopicConfig.getTopicName(), + kafkaTopicConfig.getPartitionCount(), + kafkaTopicConfig.getReplicaFactor(), + kafkaTopicConfigFactory.createKafkaTopicLevelProperties(kafkaTopicConfig), + kafkaTopicConfig.getRackAwareMode() + ); } catch (final TopicExistsException e) { throw new TopicCreationException("Topic with name " + kafkaTopicConfig.getTopicName() + " already exists (or wasn't completely removed yet)", e); @@ -244,9 +246,10 @@ public String createTopic(final NakadiTopicConfig nakadiTopicConfig) throws Topi @Override public void deleteTopic(final String topic) throws TopicDeletionException { - try { + try (KafkaZkClient zkClient = createZkClient()) { // this will only trigger topic deletion, but the actual deletion is asynchronous - doWithZkUtils(zkUtils -> AdminUtils.deleteTopic(zkUtils, topic)); + final AdminZkClient adminZkClient = new AdminZkClient(zkClient); + adminZkClient.deleteTopic(topic); } catch (final Exception e) { throw new TopicDeletionException("Unable to delete topic " + topic, e); } @@ -616,12 +619,11 @@ private Map convertToKafkaCursors(final List { - final Properties topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); - topicProps.setProperty("retention.ms", Long.toString(retentionMs)); - AdminUtils.changeTopicConfig(zkUtils, topic, topicProps); - }); + try (KafkaZkClient zkClient = createZkClient()) { + final AdminZkClient adminZkClient = new AdminZkClient(zkClient); + final Properties topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + topicProps.setProperty("retention.ms", Long.toString(retentionMs)); + adminZkClient.changeTopicConfig(topic, topicProps); } catch (final Exception e) { throw new TopicConfigException("Unable to update retention time for topic " + topic, e); } @@ -636,24 +638,20 @@ private void validateCursorForNulls(final NakadiCursor cursor) throws InvalidCur } } - private void doWithZkUtils(final ZkUtilsAction action) throws Exception { - ZkUtils zkUtils = null; - try { - zkUtils = ZkUtils.apply( - kafkaZookeeper.getZookeeperConnectionString(), - zookeeperSettings.getZkSessionTimeoutMs(), - zookeeperSettings.getZkConnectionTimeoutMs(), - false); - action.execute(zkUtils); - } finally { - if (zkUtils != null) { - zkUtils.close(); - } - } - } - - @FunctionalInterface - private interface ZkUtilsAction { - void execute(ZkUtils zkUtils) throws Exception; + private KafkaZkClient createZkClient() { + // The calling method should make sure to close connection + return new KafkaZkClient( + new ZooKeeperClient( + kafkaZookeeper.getZookeeperConnectionString(), + zookeeperSettings.getZkSessionTimeoutMs(), + zookeeperSettings.getZkConnectionTimeoutMs(), + zookeeperSettings.getMaxInFlightRequests(), + Time.SYSTEM, + ZookeeperSettings.METRIC_GROUP, + ZookeeperSettings.METRIC_TYPE + ), + false, + Time.SYSTEM + ); } } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperSettings.java b/core-common/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperSettings.java index 05ee3616eb..8f78370485 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperSettings.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/zookeeper/ZookeeperSettings.java @@ -9,12 +9,18 @@ public class ZookeeperSettings { private final int zkSessionTimeoutMs; private final int zkConnectionTimeoutMs; + private final int maxInFlightRequests; + public static final String METRIC_GROUP = "kafka.server"; + public static final String METRIC_TYPE = "kakfaZookeeper"; + @Autowired public ZookeeperSettings(@Value("${nakadi.zookeeper.sessionTimeoutMs}") final int zkSessionTimeoutMs, - @Value("${nakadi.zookeeper.connectionTimeoutMs}")final int zkConnectionTimeoutMs) { + @Value("${nakadi.zookeeper.connectionTimeoutMs}")final int zkConnectionTimeoutMs, + @Value("${nakadi.zookeeper.maxInFlightRequests}") final int maxInFlightRequests) { this.zkSessionTimeoutMs = zkSessionTimeoutMs; this.zkConnectionTimeoutMs = zkConnectionTimeoutMs; + this.maxInFlightRequests = maxInFlightRequests; } public int getZkSessionTimeoutMs() { @@ -24,4 +30,8 @@ public int getZkSessionTimeoutMs() { public int getZkConnectionTimeoutMs() { return zkConnectionTimeoutMs; } + + public int getMaxInFlightRequests() { + return maxInFlightRequests; + } } diff --git a/core-metastore/build.gradle b/core-metastore/build.gradle index d92e7babd8..fd8861a16b 100644 --- a/core-metastore/build.gradle +++ b/core-metastore/build.gradle @@ -69,6 +69,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' @@ -79,8 +80,8 @@ dependencies { compile 'io.opentracing:opentracing-api:0.31.0' compile 'io.opentracing:opentracing-util:0.31.0' - compile 'org.apache.kafka:kafka-clients:2.1.0' - compile('org.apache.kafka:kafka_2.12:2.1.0') { + compile "org.apache.kafka:kafka-clients:$kafkaClientVersion" + compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") { exclude module: "zookeeper" } compile("org.apache.curator:curator-recipes:$curatorVersion") { diff --git a/core-services/build.gradle b/core-services/build.gradle index d856f78544..b4388719bc 100644 --- a/core-services/build.gradle +++ b/core-services/build.gradle @@ -70,6 +70,7 @@ dependencies { compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion" compile "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jacksonVersion" compile "com.fasterxml.jackson.module:jackson-module-afterburner:$jacksonVersion" + compile "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jacksonVersion" compile 'org.zalando:jackson-datatype-problem:0.22.0' compile 'org.zalando:problem:0.22.0' compile 'org.json:json:20180130' @@ -80,8 +81,8 @@ dependencies { compile 'io.opentracing:opentracing-api:0.31.0' compile 'io.opentracing:opentracing-util:0.31.0' - compile 'org.apache.kafka:kafka-clients:2.1.0' - compile('org.apache.kafka:kafka_2.12:2.1.0') { + compile "org.apache.kafka:kafka-clients:$kafkaClientVersion" + compile("org.apache.kafka:kafka_2.12:$kafkaClientVersion") { exclude module: "zookeeper" } compile("org.apache.curator:curator-recipes:$curatorVersion") { diff --git a/docker-compose.yml b/docker-compose.yml index e835d0cf63..b2d8f8f878 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: - "2181:2181" kafka: - image: wurstmeister/kafka:2.11-1.1.1 + image: wurstmeister/kafka:2.12-2.4.0 ports: - "29092:29092" - "9092:9092"