Skip to content

Commit

Permalink
add support for Kafka Streams v2.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed May 11, 2020
1 parent 6dbed7c commit eb9d558
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 7 deletions.
1 change: 1 addition & 0 deletions azkarra-api/pom.xml
Expand Up @@ -80,6 +80,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Expand Up @@ -519,8 +519,14 @@ Logger logger() {
return LOG;
}

/**
* Checks if the {@link KafkaStreams} is neither RUNNING nor REBALANCING.
*/
public boolean isNotRunning() {
return !kafkaStreams.state().isRunning();
// This is equivalent to the KafkaStreams methods :
// State.isRunning() <= 2.4 or State.isRunningOrRebalancing() >= 2.5
final KafkaStreams.State state = kafkaStreams.state();
return !(state.equals(KafkaStreams.State.RUNNING) || state.equals(KafkaStreams.State.REBALANCING));
}

/**
Expand Down
Expand Up @@ -41,7 +41,7 @@
</licenses>

<properties>
<kafka.streams.version>2.4.0</kafka.streams.version>
<kafka.streams.version>2.5.0</kafka.streams.version>
<log4j.version>2.12.1</log4j.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Expand Down Expand Up @@ -111,8 +111,13 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.3.0</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion azkarra-examples/pom.xml
Expand Up @@ -80,7 +80,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.streams.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
Expand Down
1 change: 0 additions & 1 deletion azkarra-json-serializers/pom.xml
Expand Up @@ -47,7 +47,6 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
5 changes: 5 additions & 0 deletions azkarra-metrics/pom.xml
Expand Up @@ -62,6 +62,11 @@
<version>1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

</dependencies>

</project>
6 changes: 6 additions & 0 deletions azkarra-server/pom.xml
Expand Up @@ -228,6 +228,12 @@
<artifactId>mockito-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<scope>test</scope>
</dependency>

<!-- END dependencies for testing-->
</dependencies>
</project>
Expand Up @@ -42,6 +42,7 @@

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -105,13 +106,14 @@ public void shouldQueryRemoteServerSuccessfully() throws ExecutionException, Int
}

private QueryResult<String, String> newQueryResult() {
List<KV<String, String>> kv = singletonList(KV.of("k1", "v1"));
return QueryResultBuilder.<String, String>newBuilder()
.setServer(SERVER_INFO.hostAndPort())
.setStatus(QueryStatus.SUCCESS)
.setStoreName(TEST_STORE_NAME)
.setSuccessResultSet(
singletonList(
new SuccessResultSet<>(SERVER_INFO.hostAndPort(), true, singletonList(KV.of("k1", "v1")))
new SuccessResultSet<>(SERVER_INFO.hostAndPort(), true, kv)
)
)
.setTook(0)
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Expand Up @@ -85,7 +85,7 @@
</distributionManagement>

<properties>
<kafka.streams.version>2.4.0</kafka.streams.version>
<kafka.streams.version>2.5.0</kafka.streams.version>
<slf4j.version>1.7.28</slf4j.version>
<log4j.version>2.12.1</log4j.version>
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -248,6 +248,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.streams.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down

0 comments on commit eb9d558

Please sign in to comment.