Skip to content

Commit

Permalink
Add documentation about stream deduplication #130 (#151)
Browse files Browse the repository at this point in the history
* add deduplication doc #130

* Improve deduplication documentation

* Fix bad indent
  • Loading branch information
sebastienviale committed Jan 29, 2024
1 parent f1e8da9 commit eee4093
Showing 1 changed file with 92 additions and 14 deletions.
106 changes: 92 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<h1> Kstreamplify </h1>
# Kstreamplify

[![GitHub Build](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml?branch=main&logo=github&style=for-the-badge)](https://img.shields.io/github/actions/workflow/status/michelin/kstreamplify/on_push_main.yml)
[![Sonatype Nexus (Releases)](https://img.shields.io/nexus/r/com.michelin/kstreamplify?server=https%3A%2F%2Fs01.oss.sonatype.org%2F&style=for-the-badge&logo=sonatype)](https://central.sonatype.com/search?q=com.michelin.kstreamplify&sort=name)
[![GitHub release](https://img.shields.io/github/v/release/michelin/kstreamplify?logo=github&style=for-the-badge)](https://github.com/michelin/kstreamplify/releases)
Expand All @@ -10,21 +10,24 @@
[![SonarCloud Tests](https://img.shields.io/sonar/tests/michelin_kstreamplify/main?server=https%3A%2F%2Fsonarcloud.io&style=for-the-badge&logo=sonarcloud)](https://sonarcloud.io/component_measures?metric=tests&view=list&id=michelin_kstreamplify)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg?logo=apache&style=for-the-badge)](https://opensource.org/licenses/Apache-2.0)

Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no further – Kstreamplify is tailor-made for you!
Are you looking to enhance your development experience and accelerate the implementation of Kafka Streams? Look no
further – Kstreamplify is tailor-made for you!

**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host of additional advanced features.
**Kstreamplify** is a Java library that empowers you to swiftly create Kafka Streams-based applications, offering a host
of additional advanced features.

With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you need to do:
With Kstreamplify, you can declare your KafkaStreams class and define your topology with minimal effort. Here's all you
need to do:

<img src=".readme/gif/topology.gif" />

## Table of Contents

* [Features](#features)
* [Dependencies](#dependencies)
* [Java](#java)
* [Spring Boot](#spring-boot)
* [Unit Test](#unit-test)
* [Java](#java)
* [Spring Boot](#spring-boot)
* [Unit Test](#unit-test)
* [Getting Started](#getting-started)
* [Properties Injection](#properties-injection)
* [Avro Serializer and Deserializer](#avro-serializer-and-deserializer)
Expand All @@ -35,6 +38,10 @@ With Kstreamplify, you can declare your KafkaStreams class and define your topol
* [REST Endpoints](#rest-endpoints)
* [Hooks](#hooks)
* [On Start](#on-start)
* [Deduplication](#deduplication)
* [By Key](#by-key)
* [By Key and Value](#by-key-and-value)
* [By Predicate](#by-predicate)
* [Interactive Queries](#interactive-queries)
* [Testing](#testing)
* [Motivation](#motivation)
Expand Down Expand Up @@ -66,20 +73,20 @@ To include the core Kstreamplify library in your project, add the following depe

```xml
<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-core</artifactId>
<version>${kstreamplify.version}</version>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-core</artifactId>
<version>${kstreamplify.version}</version>
</dependency>
```

### Spring Boot

[![javadoc](https://javadoc.io/badge2/com.michelin/kstreamplify-spring-boot/javadoc.svg?style=for-the-badge&)](https://javadoc.io/doc/com.michelin/kstreamplify-spring-boot)

If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following dependency:
If you're using Spring Boot, you can integrate Kstreamplify with your Spring Boot application by adding the following
dependency:

```xml

<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-spring-boot</artifactId>
Expand All @@ -106,7 +113,8 @@ For both Java and Spring Boot dependencies, a testing dependency is available to

## Getting Started

To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context, overriding the `topology` method.
To begin using Kstreamplify, you simply need to set up a `KafkaStreamsStarter` bean within you Spring Boot context,
overriding the `topology` method.

For instance, you can start by creating a class annotated with `@Component`:

Expand Down Expand Up @@ -247,6 +255,76 @@ Streams instance as a parameter.

You can use this hook to perform any custom initialization or setup tasks for your Kafka Streams application.

### Deduplication

Kstreamplify facilitates deduplication of a stream based on various criteria using window stores within a specified time
frame.

The `DeduplicationUtils` class provides three deduplication implementations. Each deduplication method takes a duration
parameter that specifies how long a record will be kept in the window store for deduplication.

All deduplication methods return a `KStream<String>, ProcessingResult<V, V2>`. You may want to direct the result to
the `TopologyErrorHandler.catchErrors()` method.

**Note**: Only streams with String keys and Avro values are supported.

#### By Key

```java

@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("myTopic");

DeduplicationUtils
.deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60))
.to("myTopicDeduplicated");
}
}
```

#### By Key and Value

```java

@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("myTopic");

DeduplicationUtils
.deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
.to("myTopicDeduplicated");
}
}
```

#### By Predicate

```java

@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("myTopic");

DeduplicationUtils
.deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName())
.to("myTopicDeduplicated");
}
}
```

The given predicate will be used as a key in the window store. The stream will be deduplicated based on the predicate.

### Interactive Queries

Kstreamplify is designed to make your Kafka Streams instance ready
Expand Down

0 comments on commit eee4093

Please sign in to comment.