diff --git a/README.md b/README.md
index 142b03c..a849479 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,5 @@
-
Kstreamplify
-
+# Kstreamplify
+
[![GitHub Build](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml?branch=main&logo=github&style=for-the-badge)](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml)
[![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/com.michelin/kstreamplify?server=https%3A%2F%2Fs01.oss.sonatype.org%2F&style=for-the-badge&logo=sonatype)](https://central.sonatype.com/search?q=com.michelin.kstreamplify&sort=name)
[![GitHub release](https://img.shields.io/github/v/release/michelin/kstreamplify?logo=github&style=for-the-badge)](https://github.com/michelin/kstreamplify/releases)
@@ -10,11 +10,14 @@
[![SonarCloud Tests](https://img.shields.io/sonar/tests/michelin_kstreamplify/main?server=https%3A%2F%2Fsonarcloud.io&style=for-the-badge&logo=sonarcloud)](https://sonarcloud.io/component_measures?metric=tests&view=list&id=michelin_kstreamplify)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg?logo=apache&style=for-the-badge)](https://opensource.org/licenses/Apache-2.0)
-Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no further – Kstreamplify is tailor-made for you!
+Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no
+further – Kstreamplify is tailor-made for you!
-**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host of additional advanced features.
+**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host
+of additional advanced features.
-With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you need to do:
+With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you
+need to do:
@@ -22,9 +25,9 @@ With Kstreamplify, you can declare your KafkaStreams class and define your topol
* [Features](#features)
* [Dependencies](#dependencies)
- * [Java](#java)
- * [Spring Boot](#spring-boot)
- * [Unit Test](#unit-test)
+ * [Java](#java)
+ * [Spring Boot](#spring-boot)
+ * [Unit Test](#unit-test)
* [Getting Started](#getting-started)
* [Properties Injection](#properties-injection)
* [Avro Serializer and Deserializer](#avro-serializer-and-deserializer)
@@ -35,6 +38,10 @@ With Kstreamplify, you can declare your KafkaStreams class and define your topol
* [REST Endpoints](#rest-endpoints)
* [Hooks](#hooks)
* [On Start](#on-start)
+ * [Deduplication](#deduplication)
+ * [By Key](#by-key)
+ * [By Key and Value](#by-key-and-value)
+ * [By Predicate](#by-predicate)
* [Interactive Queries](#interactive-queries)
* [Testing](#testing)
* [Motivation](#motivation)
@@ -66,9 +73,9 @@ To include the core Kstreamplify library in your project, add the following depe
```xml
- com.michelin
- kstreamplify-core
- ${kstreamplify.version}
+ com.michelin
+ kstreamplify-core
+ ${kstreamplify.version}
```
@@ -76,10 +83,10 @@ To include the core Kstreamplify library in your project, add the following depe
[![javadoc](https://javadoc.io/badge2/com.michelin/kstreamplify-spring-boot/javadoc.svg?style=for-the-badge&)](https://javadoc.io/doc/com.michelin/kstreamplify-spring-boot)
-If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following dependency:
+If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following
+dependency:
```xml
-
com.michelin
kstreamplify-spring-boot
@@ -106,7 +113,8 @@ For both Java and Spring Boot dependencies, a testing dependency is available to
## Getting Started
-To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context, overriding the `topology` method.
+To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context,
+overriding the `topology` method.
For instance, you can start by creating a class annotated with `@Component`:
@@ -247,6 +255,76 @@ Streams instance as a parameter.
You can use this hook to perform any custom initialization or setup tasks for your Kafka Streams application.
+### Deduplication
+
+Kstreamplify facilitates deduplication of a stream based on various criteria using window stores within a specified time
+frame.
+
+The `DeduplicationUtils` class provides three deduplication implementations. Each deduplication method takes a duration
+parameter that specifies how long a record will be kept in the window store for deduplication.
+
+All deduplication methods return a `KStream, ProcessingResult`. You may want to direct the result to
+the `TopologyErrorHandler.catchErrors()` method.
+
+**Note**: Only streams with String keys and Avro values are supported.
+
+#### By Key
+
+```java
+
+@Component
+public class MyKafkaStreams extends KafkaStreamsStarter {
+ @Override
+ public void topology(StreamsBuilder streamsBuilder) {
+ KStream myStream = streamsBuilder
+ .stream("myTopic");
+
+ DeduplicationUtils
+ .deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60))
+ .to("myTopicDeduplicated");
+ }
+}
+```
+
+#### By Key and Value
+
+```java
+
+@Component
+public class MyKafkaStreams extends KafkaStreamsStarter {
+ @Override
+ public void topology(StreamsBuilder streamsBuilder) {
+ KStream myStream = streamsBuilder
+ .stream("myTopic");
+
+ DeduplicationUtils
+ .deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
+ .to("myTopicDeduplicated");
+ }
+}
+```
+
+#### By Predicate
+
+```java
+
+@Component
+public class MyKafkaStreams extends KafkaStreamsStarter {
+ @Override
+ public void topology(StreamsBuilder streamsBuilder) {
+ KStream myStream = streamsBuilder
+ .stream("myTopic");
+
+ DeduplicationUtils
+ .deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60),
+ value -> value.getFirstName() + "#" + value.getLastName())
+ .to("myTopicDeduplicated");
+ }
+}
+```
+
+The given predicate will be used as a key in the window store. The stream will be deduplicated based on the predicate.
+
### Interactive Queries
Kstreamplify is designed to make your Kafka Streams instance ready