diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 6bf52e8..3dabc7a 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -1,6 +1,6 @@ name: Build and run tests -on: [push, pull_request] +on: [push] jobs: compile: diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index ef22b58..0000000 --- a/.travis.yml +++ /dev/null @@ -1,18 +0,0 @@ -language: java - -sudo: false -dist: trusty - -matrix: - include: - - jdk: oraclejdk8 - - jdk: oraclejdk9 - - jdk: openjdk10 - - jdk: openjdk11 - - jdk: openjdk12 - - jdk: openjdk13 - - jdk: openjdk14 - -after_success: - - bash <(curl -s https://codecov.io/bash) - diff --git a/README.md b/README.md index 0e1773f..fd20051 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou Example for Gradle: ```groovy -compile 'net.soundvibe:kafka-config:2.6.1' +compile 'net.soundvibe:kafka-config:2.7.0' ``` and for Maven: @@ -114,7 +114,7 @@ and for Maven: net.soundvibe kafka-config - 2.6.1 + 2.7.0 ``` diff --git a/pom.xml b/pom.xml index ac511fc..4767c32 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.soundvibe kafka-config - 2.6.1 + 2.7.0 jar kafka-config Typesafe configuration for Kafka clients @@ -36,7 +36,7 @@ 1.8 UTF-8 - 2.6.1 + 2.7.0 @@ -154,6 +154,7 @@ ossrh https://oss.sonatype.org/ true + 15 diff --git a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java index 05eb713..c45fd68 100644 --- a/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/AbstractConfigBuilder.java @@ -202,6 +202,27 @@ public final T withSecurityProviders(Class... return (T) this; } + /** + * The amount of time the client will wait for the socket connection to be established. + * If the connection is not built before the timeout elapses, clients will close the socket channel. + * Default: 10s. + */ + public T withSocketConnectionSetupTimeout(Duration socketConnectionSetupTimeout) { + props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeout.toMillis()); + return (T) this; + } + + /** + * The maximum amount of time the client will wait for the socket connection to be established. + * The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. + * To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value. + * Default: 127s. + */ + public T withSocketConnectionSetupTimeoutMax(Duration socketConnectionSetupTimeoutMax) { + props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMax.toMillis()); + return (T) this; + } + /** * Use custom configuration property * @param name property name diff --git a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java index 4913190..c138a0e 100644 --- a/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/producer/ProducerConfigBuilder.java @@ -101,9 +101,14 @@ public ProducerConfigBuilder withMaxRequestSize(int maxRequestSize) { } /** - * The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. - * These methods can be blocked either because the buffer is full or metadata unavailable. - * Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. + * The configuration controls how long the KafkaProducer's send(), partitionsFor(), " + * initTransactions(), sendOffsetsToTransaction(), commitTransaction() " + * and abortTransaction() methods will block. " + * For send() this timeout bounds the total time waiting for both metadata fetch and buffer allocation " + * (blocking in the user-supplied serializers or partitioner is not counted against this timeout). " + * For partitionsFor() this timeout bounds the time spent waiting for metadata if it is unavailable. " + * The transaction-related methods always block, but may timeout if " + * the transaction coordinator could not be discovered or did not respond within the timeout. */ public ProducerConfigBuilder withMaxBlock(Duration maxBlock) { props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlock.toMillis()); diff --git a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java index 1a4bac5..6e9e7d3 100644 --- a/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java +++ b/src/main/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilder.java @@ -73,7 +73,7 @@ public StreamsConfigBuilder withDefaultDeserializationExceptionHandler(Classorg.apache.kafka.common.serialization.Serde interface via * DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well */ - public StreamsConfigBuilder withDefaultKeySerde(Class defaultKeySerde) { + public StreamsConfigBuilder withDefaultKeySerde(Class> defaultKeySerde) { props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde); return this; } @@ -83,7 +83,7 @@ public StreamsConfigBuilder withDefaultKeySerde(Class defaultKe * Note when windowed serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via * DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well */ - public StreamsConfigBuilder withDefaultValueSerde(Class defaultValueSerde) { + public StreamsConfigBuilder withDefaultValueSerde(Class> defaultValueSerde) { props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde); return this; } @@ -143,7 +143,7 @@ public StreamsConfigBuilder withProcessingGuarantee(ProcessingGuarantee processi * A configuration telling Kafka Streams if it should optimize the topology, disabled by default */ public StreamsConfigBuilder withTopologyOptimization(TopologyOptimization topologyOptimization) { - props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization.name); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization.name); return this; } @@ -214,7 +214,7 @@ public StreamsConfigBuilder withStateCleanupDelay(Duration stateCleanupDelay) { * Default is null. Accepted values are UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11(for upgrading from the corresponding old version). */ public StreamsConfigBuilder withUpgradeFrom(UpgradeFrom upgradeFrom) { - props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? upgradeFrom : upgradeFrom.version); + props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? null : upgradeFrom.version); return this; } diff --git a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java index 0886461..008ab11 100644 --- a/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/admin/AdminConfigBuilderTest.java @@ -33,6 +33,8 @@ void should_build_all_properties() { .withRetryBackoff(Duration.ofSeconds(5)) .withSendBufferBytes(1024) .withCustomMap(new HashMap<>()) + .withSocketConnectionSetupTimeout(Duration.ofSeconds(15)) + .withSocketConnectionSetupTimeoutMax(Duration.ofSeconds(30)) .buildProperties(); assertEquals(BOOTSTRAP_SERVERS, adminProps.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); @@ -42,4 +44,4 @@ void should_build_all_properties() { private void assertValid(Properties properties) { assertDoesNotThrow(() -> AdminClient.create(properties)); } -} \ No newline at end of file +} diff --git a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java index cb0acdf..afb5dea 100644 --- a/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java +++ b/src/test/java/net/soundvibe/kafka/config/streams/StreamsConfigBuilderTest.java @@ -22,7 +22,6 @@ class StreamsConfigBuilderTest { private static final String BOOTSTRAP_SERVERS = "http://localhost:9876"; @Test - @DisabledOnOs(WINDOWS) void should_build_all_properties() throws IOException { Properties streamProps = StreamsConfigBuilder.create() .withBootstrapServers(BOOTSTRAP_SERVERS) @@ -73,7 +72,6 @@ void should_build_all_properties() throws IOException { } @Test - @DisabledOnOs(WINDOWS) void should_set_upgrade_from_to_null() { Properties streamProps = StreamsConfigBuilder.create() .withBootstrapServers(BOOTSTRAP_SERVERS)