From ce24294286e7ce2dfdef0333da1f081ff06c4dcc Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 15 Jan 2021 18:06:25 +0200 Subject: [PATCH 1/6] upgrade to 2.7.0 --- README.md | 4 ++-- pom.xml | 4 ++-- .../kafka/config/AbstractConfigBuilder.java | 21 +++++++++++++++++++ .../producer/ProducerConfigBuilder.java | 11 +++++++--- .../config/streams/StreamsConfigBuilder.java | 8 +++---- .../config/admin/AdminConfigBuilderTest.java | 4 +++- 6 files changed, 40 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 0e1773f..fd20051 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou Example for Gradle: ```groovy -compile 'net.soundvibe:kafka-config:2.6.1' +compile 'net.soundvibe:kafka-config:2.7.0' ``` and for Maven: @@ -114,7 +114,7 @@ and for Maven: net.soundvibe kafka-config - 2.6.1 + 2.7.0 ``` diff --git a/pom.xml b/pom.xml index ac511fc..1a31c52 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.soundvibe kafka-config - 2.6.1 + 2.7.0 jar kafka-config Typesafe configuration for Kafka clients @@ -36,7 +36,7 @@ 1.8 UTF-8 - 2.6.1 + 2.7.0 diff --git a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java index 05eb713..c45fd68 100644 --- a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java @@ -202,6 +202,27 @@ public final T withSecurityProviders(Class... return (T) this; } + /** + * The amount of time the client will wait for the socket connection to be established. + * If the connection is not built before the timeout elapses, clients will close the socket channel. + * Default: 10s. + */ + public T withSocketConnectionSetupTimeout(Duration socketConnectionSetupTimeout) { + props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeout.toMillis()); + return (T) this; + } + + /** + * The maximum amount of time the client will wait for the socket connection to be established. + * The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. + * To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. + * Default: 127s. + */ + public T withSocketConnectionSetupTimeoutMax(Duration socketConnectionSetupTimeoutMax) { + props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMax.toMillis()); + return (T) this; + } + /** * Use custom configuration property * @param name property name diff --git a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java index 4913190..c138a0e 100644 --- a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java @@ -101,9 +101,14 @@ public ProducerConfigBuilder withMaxRequestSize(int maxRequestSize) { } /** - * The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. - * These methods can be blocked either because the buffer is full or metadata unavailable. - * Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. + * The configuration controls how long the KafkaProducer's send(), partitionsFor(), " + * initTransactions(), sendOffsetsToTransaction(), commitTransaction() " + * and abortTransaction() methods will block. " + * For send() this timeout bounds the total time waiting for both metadata fetch and buffer allocation " + * (blocking in the user-supplied serializers or partitioner is not counted against this timeout). " + * For partitionsFor() this timeout bounds the time spent waiting for metadata if it is unavailable. " + * The transaction-related methods always block, but may timeout if " + * the transaction coordinator could not be discovered or did not respond within the timeout. */ public ProducerConfigBuilder withMaxBlock(Duration maxBlock) { props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlock.toMillis()); diff --git a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java index 1a4bac5..6e9e7d3 100644 --- a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java @@ -73,7 +73,7 @@ public StreamsConfigBuilder withDefaultDeserializationExceptionHandler(Classorg.apache.kafka.common.serialization.Serde interface via * DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well */ - public StreamsConfigBuilder withDefaultKeySerde(Class defaultKeySerde) { + public StreamsConfigBuilder withDefaultKeySerde(Class> defaultKeySerde) { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde); return this; } @@ -83,7 +83,7 @@ public StreamsConfigBuilder withDefaultKeySerde(Class defaultKe * Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via * DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well */ - public StreamsConfigBuilder withDefaultValueSerde(Class defaultValueSerde) { + public StreamsConfigBuilder withDefaultValueSerde(Class> defaultValueSerde) { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde); return this; } @@ -143,7 +143,7 @@ public StreamsConfigBuilder withProcessingGuarantee(ProcessingGuarantee processi * A configuration telling Kafka Streams if it should optimize the topology, disabled by default */ public StreamsConfigBuilder withTopologyOptimization(TopologyOptimization topologyOptimization) { - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization.name); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization.name); return this; } @@ -214,7 +214,7 @@ public StreamsConfigBuilder withStateCleanupDelay(Duration stateCleanupDelay) { * Default is null. Accepted values are UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11(for upgrading from the corresponding old version). */ public StreamsConfigBuilder withUpgradeFrom(UpgradeFrom upgradeFrom) { - props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? upgradeFrom : upgradeFrom.version); + props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? null : upgradeFrom.version); return this; } diff --git a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java index 0886461..008ab11 100644 --- a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java @@ -33,6 +33,8 @@ void should_build_all_properties() { .withRetryBackoff(Duration.ofSeconds(5)) .withSendBufferBytes(1024) .withCustomMap(new HashMap<>()) + .withSocketConnectionSetupTimeout(Duration.ofSeconds(15)) + .withSocketConnectionSetupTimeoutMax(Duration.ofSeconds(30)) .buildProperties(); assertEquals(BOOTSTRAP_SERVERS, adminProps.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); @@ -42,4 +44,4 @@ void should_build_all_properties() { private void assertValid(Properties properties) { assertDoesNotThrow(() -> AdminClient.create(properties)); } -} \ No newline at end of file +} From cbdca381b0abc0b62e2032641bb871c3845cdb93 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 15 Jan 2021 18:15:19 +0200 Subject: [PATCH 2/6] update travis to run on java 15 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ef22b58..559d5bb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ matrix: - jdk: openjdk12 - jdk: openjdk13 - jdk: openjdk14 + - jdk: openjdk15 after_success: - bash <(curl -s https://codecov.io/bash) From 208bc23e9fa1c82d45a85856d27b26daeb1c8486 Mon Sep 17 00:00:00 2001 From: soundvibe Date: Thu, 4 Feb 2021 23:01:21 +0200 Subject: [PATCH 3/6] enable streams tests. deleted travis config (using github actions now). --- .travis.yml | 19 ------------------- .../streams/StreamsConfigBuilderTest.java | 2 -- 2 files changed, 21 deletions(-) delete mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 559d5bb..0000000 --- a/.travis.yml +++ /dev/null @@ -1,19 +0,0 @@ -language: java - -sudo: false -dist: trusty - -matrix: - include: - - jdk: oraclejdk8 - - jdk: oraclejdk9 - - jdk: openjdk10 - - jdk: openjdk11 - - jdk: openjdk12 - - jdk: openjdk13 - - jdk: openjdk14 - - jdk: openjdk15 - -after_success: - - bash <(curl -s https://codecov.io/bash) - diff --git a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java index cb0acdf..afb5dea 100644 --- a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java @@ -22,7 +22,6 @@ class StreamsConfigBuilderTest { private static final String BOOTSTRAP_SERVERS = "http://localhost:9876"; @Test - @DisabledOnOs(WINDOWS) void should_build_all_properties() throws IOException { Properties streamProps = StreamsConfigBuilder.create() .withBootstrapServers(BOOTSTRAP_SERVERS) @@ -73,7 +72,6 @@ void should_build_all_properties() throws IOException { } @Test - @DisabledOnOs(WINDOWS) void should_set_upgrade_from_to_null() { Properties streamProps = StreamsConfigBuilder.create() .withBootstrapServers(BOOTSTRAP_SERVERS) From dce89eb1b35402fce73a0f48aab75e772ab0357e Mon Sep 17 00:00:00 2001 From: soundvibe Date: Thu, 4 Feb 2021 23:03:55 +0200 Subject: [PATCH 4/6] run github actions only on push --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 6bf52e8..3dabc7a 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -1,6 +1,6 @@ name: Build and run tests -on: [push, pull_request] +on: [push] jobs: compile: From 154fa3767f5cb174f80a291d19ce3c49a9d88a88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=BBinas?= Date: Thu, 4 Feb 2021 23:06:55 +0200 Subject: [PATCH 5/6] Update maven.yml --- .github/workflows/maven.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 6bf52e8..3dabc7a 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -1,6 +1,6 @@ name: Build and run tests -on: [push, pull_request] +on: [push] jobs: compile: From 6000cd4b0d5af659c4a1e3b79bc981a71765246e Mon Sep 17 00:00:00 2001 From: soundvibe Date: Fri, 5 Feb 2021 01:28:47 +0200 Subject: [PATCH 6/6] increased nexus staging timeout --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 1a31c52..4767c32 100644 --- a/pom.xml +++ b/pom.xml @@ -154,6 +154,7 @@ ossrh https://oss.sonatype.org/ true + 15