Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.7.0 #9

Merged
merged 8 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Build and run tests

on: [push, pull_request]
on: [push]

jobs:
compile:
Expand Down
18 changes: 0 additions & 18 deletions .travis.yml

This file was deleted.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Binaries and dependency information for Maven, Ivy, Gradle and others can be fou
Example for Gradle:

```groovy
compile 'net.soundvibe:kafka-config:2.6.1'
compile 'net.soundvibe:kafka-config:2.7.0'
```

and for Maven:
Expand All @@ -114,7 +114,7 @@ and for Maven:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
</dependency>
```

Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>kafka-config</artifactId>
<version>2.6.1</version>
<version>2.7.0</version>
<packaging>jar</packaging>
<name>kafka-config</name>
<description>Typesafe configuration for Kafka clients</description>
Expand Down Expand Up @@ -36,7 +36,7 @@
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>2.6.1</kafka.version>
<kafka.version>2.7.0</kafka.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -154,6 +154,7 @@
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
<stagingProgressTimeoutMinutes>15</stagingProgressTimeoutMinutes>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,27 @@ public final T withSecurityProviders(Class<? extends SecurityProviderCreator>...
return (T) this;
}

/**
* The amount of time the client will wait for the socket connection to be established.
* If the connection is not built before the timeout elapses, clients will close the socket channel.
* Default: 10s.
*/
public T withSocketConnectionSetupTimeout(Duration socketConnectionSetupTimeout) {
props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, socketConnectionSetupTimeout.toMillis());
return (T) this;
}

/**
* The maximum amount of time the client will wait for the socket connection to be established.
* The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum.
* To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.
* Default: 127s.
*/
public T withSocketConnectionSetupTimeoutMax(Duration socketConnectionSetupTimeoutMax) {
props.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, socketConnectionSetupTimeoutMax.toMillis());
return (T) this;
}

/**
* Use custom configuration property
* @param name property name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ public ProducerConfigBuilder withMaxRequestSize(int maxRequestSize) {
}

/**
* The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block.
* These methods can be blocked either because the buffer is full or metadata unavailable.
* Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.
* The configuration controls how long the <code>KafkaProducer</code>'s <code>send()</code>, <code>partitionsFor()</code>, "
* <code>initTransactions()</code>, <code>sendOffsetsToTransaction()</code>, <code>commitTransaction()</code> "
* and <code>abortTransaction()</code> methods will block. "
* For <code>send()</code> this timeout bounds the total time waiting for both metadata fetch and buffer allocation "
* (blocking in the user-supplied serializers or partitioner is not counted against this timeout). "
* For <code>partitionsFor()</code> this timeout bounds the time spent waiting for metadata if it is unavailable. "
* The transaction-related methods always block, but may timeout if "
* the transaction coordinator could not be discovered or did not respond within the timeout.
*/
public ProducerConfigBuilder withMaxBlock(Duration maxBlock) {
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlock.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public StreamsConfigBuilder withDefaultDeserializationExceptionHandler(Class<? e
* Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well
*/
public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde> defaultKeySerde) {
public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde<?>> defaultKeySerde) {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, defaultKeySerde);
return this;
}
Expand All @@ -83,7 +83,7 @@ public StreamsConfigBuilder withDefaultKeySerde(Class<? extends Serde> defaultKe
* Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via
* DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS or DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS as well
*/
public StreamsConfigBuilder withDefaultValueSerde(Class<? extends Serde> defaultValueSerde) {
public StreamsConfigBuilder withDefaultValueSerde(Class<? extends Serde<?>> defaultValueSerde) {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, defaultValueSerde);
return this;
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public StreamsConfigBuilder withProcessingGuarantee(ProcessingGuarantee processi
* A configuration telling Kafka Streams if it should optimize the topology, disabled by default
*/
public StreamsConfigBuilder withTopologyOptimization(TopologyOptimization topologyOptimization) {
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization.name);
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, topologyOptimization.name);
return this;
}

Expand Down Expand Up @@ -214,7 +214,7 @@ public StreamsConfigBuilder withStateCleanupDelay(Duration stateCleanupDelay) {
* Default is null. Accepted values are UPGRADE_FROM_0100, UPGRADE_FROM_0101, UPGRADE_FROM_0102, UPGRADE_FROM_0110, UPGRADE_FROM_10, UPGRADE_FROM_11(for upgrading from the corresponding old version).
*/
public StreamsConfigBuilder withUpgradeFrom(UpgradeFrom upgradeFrom) {
props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? upgradeFrom : upgradeFrom.version);
props.put(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom == null ? null : upgradeFrom.version);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ void should_build_all_properties() {
.withRetryBackoff(Duration.ofSeconds(5))
.withSendBufferBytes(1024)
.withCustomMap(new HashMap<>())
.withSocketConnectionSetupTimeout(Duration.ofSeconds(15))
.withSocketConnectionSetupTimeoutMax(Duration.ofSeconds(30))
.buildProperties();

assertEquals(BOOTSTRAP_SERVERS, adminProps.getProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
Expand All @@ -42,4 +44,4 @@ void should_build_all_properties() {
private void assertValid(Properties properties) {
assertDoesNotThrow(() -> AdminClient.create(properties));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class StreamsConfigBuilderTest {
private static final String BOOTSTRAP_SERVERS = "http://localhost:9876";

@Test
@DisabledOnOs(WINDOWS)
void should_build_all_properties() throws IOException {
Properties streamProps = StreamsConfigBuilder.create()
.withBootstrapServers(BOOTSTRAP_SERVERS)
Expand Down Expand Up @@ -73,7 +72,6 @@ void should_build_all_properties() throws IOException {
}

@Test
@DisabledOnOs(WINDOWS)
void should_set_upgrade_from_to_null() {
Properties streamProps = StreamsConfigBuilder.create()
.withBootstrapServers(BOOTSTRAP_SERVERS)
Expand Down