From eee40938cb22b3fe3d4718390fa0e8d97b91cf9a Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 29 Jan 2024 09:04:44 +0100 Subject: [PATCH] Add documentation about stream deduplication #130 (#151) * add deduplication doc #130 * Improve deduplication documentation * Fix bad indent --- README.md | 106 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 92 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 142b03c..a849479 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -

Kstreamplify

- +# Kstreamplify + [![GitHub Build](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml?branch=main&logo=github&style=for-the-badge)](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml) [![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/com.michelin/kstreamplify?server=https%3A%2F%2Fs01.oss.sonatype.org%2F&style=for-the-badge&logo=sonatype)](https://central.sonatype.com/search?q=com.michelin.kstreamplify&sort=name) [![GitHub release](https://img.shields.io/github/v/release/michelin/kstreamplify?logo=github&style=for-the-badge)](https://github.com/michelin/kstreamplify/releases) @@ -10,11 +10,14 @@ [![SonarCloud Tests](https://img.shields.io/sonar/tests/michelin_kstreamplify/main?server=https%3A%2F%2Fsonarcloud.io&style=for-the-badge&logo=sonarcloud)](https://sonarcloud.io/component_measures?metric=tests&view=list&id=michelin_kstreamplify) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg?logo=apache&style=for-the-badge)](https://opensource.org/licenses/Apache-2.0) -Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no further – Kstreamplify is tailor-made for you! +Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no +further – Kstreamplify is tailor-made for you! -**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host of additional advanced features. +**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host +of additional advanced features. -With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you need to do: +With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you +need to do: @@ -22,9 +25,9 @@ With Kstreamplify, you can declare your KafkaStreams class and define your topol * [Features](#features) * [Dependencies](#dependencies) - * [Java](#java) - * [Spring Boot](#spring-boot) - * [Unit Test](#unit-test) + * [Java](#java) + * [Spring Boot](#spring-boot) + * [Unit Test](#unit-test) * [Getting Started](#getting-started) * [Properties Injection](#properties-injection) * [Avro Serializer and Deserializer](#avro-serializer-and-deserializer) @@ -35,6 +38,10 @@ With Kstreamplify, you can declare your KafkaStreams class and define your topol * [REST Endpoints](#rest-endpoints) * [Hooks](#hooks) * [On Start](#on-start) + * [Deduplication](#deduplication) + * [By Key](#by-key) + * [By Key and Value](#by-key-and-value) + * [By Predicate](#by-predicate) * [Interactive Queries](#interactive-queries) * [Testing](#testing) * [Motivation](#motivation) @@ -66,9 +73,9 @@ To include the core Kstreamplify library in your project, add the following depe ```xml - com.michelin - kstreamplify-core - ${kstreamplify.version} + com.michelin + kstreamplify-core + ${kstreamplify.version} ``` @@ -76,10 +83,10 @@ To include the core Kstreamplify library in your project, add the following depe [![javadoc](https://javadoc.io/badge2/com.michelin/kstreamplify-spring-boot/javadoc.svg?style=for-the-badge&)](https://javadoc.io/doc/com.michelin/kstreamplify-spring-boot) -If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following dependency: +If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following +dependency: ```xml - com.michelin kstreamplify-spring-boot @@ -106,7 +113,8 @@ For both Java and Spring Boot dependencies, a testing dependency is available to ## Getting Started -To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context, overriding the `topology` method. +To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context, +overriding the `topology` method. For instance, you can start by creating a class annotated with `@Component`: @@ -247,6 +255,76 @@ Streams instance as a parameter. You can use this hook to perform any custom initialization or setup tasks for your Kafka Streams application. +### Deduplication + +Kstreamplify facilitates deduplication of a stream based on various criteria using window stores within a specified time +frame. + +The `DeduplicationUtils` class provides three deduplication implementations. Each deduplication method takes a duration +parameter that specifies how long a record will be kept in the window store for deduplication. + +All deduplication methods return a `KStream, ProcessingResult`. You may want to direct the result to +the `TopologyErrorHandler.catchErrors()` method. + +**Note**: Only streams with String keys and Avro values are supported. + +#### By Key + +```java + +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("myTopic"); + + DeduplicationUtils + .deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60)) + .to("myTopicDeduplicated"); + } +} +``` + +#### By Key and Value + +```java + +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("myTopic"); + + DeduplicationUtils + .deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60)) + .to("myTopicDeduplicated"); + } +} +``` + +#### By Predicate + +```java + +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("myTopic"); + + DeduplicationUtils + .deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60), + value -> value.getFirstName() + "#" + value.getLastName()) + .to("myTopicDeduplicated"); + } +} +``` + +The given predicate will be used as a key in the window store. The stream will be deduplicated based on the predicate. + ### Interactive Queries Kstreamplify is designed to make your Kafka Streams instance ready