Skip to content

Commit

Permalink
Merge pull request #9 from soundvibe/2.7.0
Browse files Browse the repository at this point in the history
2.7.0
  • Loading branch information
soundvibe committed Feb 4, 2021
2 parents 416042f + 6000cd4 commit 783e6af
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Build and run tests

on: [push, pull_request]
on: [push]

jobs:
compile:
Expand Down
18 changes: 0 additions & 18 deletions .travis.yml

This file was deleted.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou
Example for Gradle:

```groovy
compile 'net.soundvibe:kafka-config:2.6.1'
compile 'net.soundvibe:kafka-config:2.7.0'
```

and for Maven:
Expand All @@ -114,7 +114,7 @@ and for Maven:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
</dependency>
```

Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
<packaging>jar</packaging>
<name>kafka-config</name>
<description>Typesafe configuration for Kafka clients</description>
Expand Down Expand Up @@ -36,7 +36,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.6.1</kafka.version>
<kafka.version>2.7.0</kafka.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -154,6 +154,7 @@
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
<stagingProgressTimeoutMinutes>15</stagingProgressTimeoutMinutes>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,27 @@ public final T withSecurityProviders(Class<? extends SecurityProviderCreator>...
return (T) this;
}

/**
* The amount of time the client will wait for the socket connection to be established.
* If the connection is not built before the timeout elapses, clients will close the socket channel.
* Default: 10s.
*/
public T withSocketConnectionSetupTimeout(Duration socketConnectionSetupTimeout) {
props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeout.toMillis());
return (T) this;
}

/**
* The maximum amount of time the client will wait for the socket connection to be established.
* The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.
* To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
* Default: 127s.
*/
public T withSocketConnectionSetupTimeoutMax(Duration socketConnectionSetupTimeoutMax) {
props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMax.toMillis());
return (T) this;
}

/**
* Use custom configuration property
* @param name property name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ public ProducerConfigBuilder withMaxRequestSize(int maxRequestSize) {
}

/**
* The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block.
* These methods can be blocked either because the buffer is full or metadata unavailable.
* Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
* The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
* <code>initTransactions()</code>, <code>sendOffsetsToTransaction()</code>, <code>commitTransaction()</code> "
* and <code>abortTransaction()</code> methods will block. "
* For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
* (blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
* For <code>partitionsFor()</code> this timeout bounds the time spent waiting for metadata if it is unavailable. "
* The transaction-related methods always block, but may timeout if "
* the transaction coordinator could not be discovered or did not respond within the timeout.
*/
public ProducerConfigBuilder withMaxBlock(Duration maxBlock) {
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlock.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public StreamsConfigBuilder withDefaultDeserializationExceptionHandler(Class<? e
* Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well
*/
public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde> defaultKeySerde) {
public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde<?>> defaultKeySerde) {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde);
return this;
}
Expand All @@ -83,7 +83,7 @@ public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde> defaultKe
* Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well
*/
public StreamsConfigBuilder withDefaultValueSerde(Class<? extends Serde> defaultValueSerde) {
public StreamsConfigBuilder withDefaultValueSerde(Class<? extends Serde<?>> defaultValueSerde) {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde);
return this;
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public StreamsConfigBuilder withProcessingGuarantee(ProcessingGuarantee processi
* A configuration telling Kafka Streams if it should optimize the topology, disabled by default
*/
public StreamsConfigBuilder withTopologyOptimization(TopologyOptimization topologyOptimization) {
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization.name);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization.name);
return this;
}

Expand Down Expand Up @@ -214,7 +214,7 @@ public StreamsConfigBuilder withStateCleanupDelay(Duration stateCleanupDelay) {
* Default is null. Accepted values are UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11(for upgrading from the corresponding old version).
*/
public StreamsConfigBuilder withUpgradeFrom(UpgradeFrom upgradeFrom) {
props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? upgradeFrom : upgradeFrom.version);
props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? null : upgradeFrom.version);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ void should_build_all_properties() {
.withRetryBackoff(Duration.ofSeconds(5))
.withSendBufferBytes(1024)
.withCustomMap(new HashMap<>())
.withSocketConnectionSetupTimeout(Duration.ofSeconds(15))
.withSocketConnectionSetupTimeoutMax(Duration.ofSeconds(30))
.buildProperties();

assertEquals(BOOTSTRAP_SERVERS, adminProps.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
Expand All @@ -42,4 +44,4 @@ void should_build_all_properties() {
private void assertValid(Properties properties) {
assertDoesNotThrow(() -> AdminClient.create(properties));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class StreamsConfigBuilderTest {
private static final String BOOTSTRAP_SERVERS = "http://localhost:9876";

@Test
@DisabledOnOs(WINDOWS)
void should_build_all_properties() throws IOException {
Properties streamProps = StreamsConfigBuilder.create()
.withBootstrapServers(BOOTSTRAP_SERVERS)
Expand Down Expand Up @@ -73,7 +72,6 @@ void should_build_all_properties() throws IOException {
}

@Test
@DisabledOnOs(WINDOWS)
void should_set_upgrade_from_to_null() {
Properties streamProps = StreamsConfigBuilder.create()
.withBootstrapServers(BOOTSTRAP_SERVERS)
Expand Down

0 comments on commit 783e6af

Please sign in to comment.