Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use window stores in deduplication processors #134

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@

import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import java.time.Instant;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;

/**
* Transformer class for the deduplication mechanism on keys of a given topic.
*
* @param <V> The type of the value
*/
public class DedupKeyProcessor<V extends SpecificRecord>
implements Processor<String, V, String, ProcessingResult<V, V>> {
implements Processor<String, V, String, ProcessingResult<V, V>> {

/**
* Kstream context for this transformer.
Expand All @@ -26,12 +25,12 @@ public class DedupKeyProcessor<V extends SpecificRecord>
/**
* Window store containing all the records seen on the given window.
*/
private TimestampedKeyValueStore<String, String> dedupTimestampedStore;
private WindowStore<String, String> dedupWindowStore;

/**
* Window store name, initialized @ construction.
*/
private final String dedupStoreName;
private final String windowStoreName;

/**
* Retention window for the statestore. Used for fetching data.
Expand All @@ -41,52 +40,46 @@ public class DedupKeyProcessor<V extends SpecificRecord>
/**
* Constructor.
*
* @param dedupStoreName The name of the constructor
* @param windowStoreName The name of the constructor
* @param retentionWindowDuration The retentionWindow Duration
*/
public DedupKeyProcessor(String dedupStoreName, Duration retentionWindowDuration) {
this.dedupStoreName = dedupStoreName;
public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuration) {
this.windowStoreName = windowStoreName;
this.retentionWindowDuration = retentionWindowDuration;
}

@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
processorContext = context;

dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName);

processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME,
currentTimestamp -> {
try (var iterator = dedupTimestampedStore.all()) {
while (iterator.hasNext()) {
var currentRecord = iterator.next();
if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis()
< currentTimestamp) {
dedupTimestampedStore.delete(currentRecord.key);
}
}
}
});
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
}

@Override
public void process(Record<String, V> message) {
String key = message.key();
try {
// Retrieve the matching key in the statestore and return null if found (signaling a duplicate)
if (dedupTimestampedStore.get(key) == null) {

// First time we see this record, store entry in the window store and forward the record to the output
dedupTimestampedStore.put(key,
ValueAndTimestamp.make(key, processorContext.currentStreamTimeMs()));
try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());

processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(message.key(),
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (message.key().equals(currentKeyValue.value)) {
return;
}
}
}

// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(message.key(), message.key(), message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(e, message,
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@

import com.michelin.kstreamplify.error.ProcessingResult;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;


/**
* Transformer class for the deduplication mechanism on keys of a given topic.
* Transformer class for the deduplication mechanism on predicate of a given topic.
*
* @param <K> The type of the key
* @param <V> The type of the value
*/
public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
implements Processor<K, V, K, ProcessingResult<V, V>> {
implements Processor<K, V, K, ProcessingResult<V, V>> {

/**
* Kstream context for this transformer.
Expand All @@ -29,12 +29,12 @@ public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
/**
* Window store containing all the records seen on the given window.
*/
private TimestampedKeyValueStore<String, V> dedupTimestampedStore;
private WindowStore<String, V> dedupWindowStore;

/**
* Window store name, initialized @ construction.
*/
private final String dedupStoreName;
private final String windowStoreName;

/**
* Retention window for the statestore. Used for fetching data.
Expand All @@ -49,13 +49,13 @@ public class DedupWithPredicateProcessor<K, V extends SpecificRecord>
/**
* Constructor.
*
* @param dedupStoreName Name of the deduplication state store
* @param windowStoreName Name of the deduplication state store
* @param retentionWindowDuration Retention window duration
* @param deduplicationKeyExtractor Deduplication function
*/
public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWindowDuration,
public DedupWithPredicateProcessor(String windowStoreName, Duration retentionWindowDuration,
Function<V, String> deduplicationKeyExtractor) {
this.dedupStoreName = dedupStoreName;
this.windowStoreName = windowStoreName;
this.retentionWindowDuration = retentionWindowDuration;
this.deduplicationKeyExtractor = deduplicationKeyExtractor;
}
Expand All @@ -64,38 +64,37 @@ public DedupWithPredicateProcessor(String dedupStoreName, Duration retentionWind
public void init(ProcessorContext<K, ProcessingResult<V, V>> context) {
this.processorContext = context;

dedupTimestampedStore = this.processorContext.getStateStore(dedupStoreName);

processorContext.schedule(Duration.ofHours(1), PunctuationType.WALL_CLOCK_TIME,
currentTimestamp -> {
try (var iterator = dedupTimestampedStore.all()) {
while (iterator.hasNext()) {
var currentRecord = iterator.next();
if (currentRecord.value.timestamp() + retentionWindowDuration.toMillis()
< currentTimestamp) {
dedupTimestampedStore.delete(currentRecord.key);
}
}
}
});
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
}

@Override
public void process(Record<K, V> message) {
try {

try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = deduplicationKeyExtractor.apply(message.value());
// Retrieve the matching identifier in the statestore and return null if found it (signaling a duplicate)
if (dedupTimestampedStore.get(identifier) == null) {
// First time we see this record, store entry in the window store and forward the record to the output
dedupTimestampedStore.put(identifier,
ValueAndTimestamp.make(message.value(), message.timestamp()));
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));

// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) {
return;
}
}
}

// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, message.value(), message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));

} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(e, message,
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
"Couldn't figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;

/**
Expand Down Expand Up @@ -69,18 +68,16 @@ public static <V extends SpecificRecord> KStream<String, ProcessingResult<V, V>>
StreamsBuilder streamsBuilder, KStream<String, V> initialStream, String storeName,
String repartitionName, Duration windowDuration) {

StoreBuilder<TimestampedKeyValueStore<String, String>> dedupStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(),
Serdes.String());
streamsBuilder.addStateStore(dedupStore);

StoreBuilder<WindowStore<String, String>> dedupWindowStore = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false),
Serdes.String(), Serdes.String());
streamsBuilder.addStateStore(dedupWindowStore);

var repartitioned = initialStream.repartition(
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
.withName(repartitionName));
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
.withName(repartitionName));
return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration),
storeName);
storeName);
}

/**
Expand Down Expand Up @@ -192,11 +189,11 @@ public static <V extends SpecificRecord> KStream<String, ProcessingResult<V, V>>
StreamsBuilder streamsBuilder, KStream<String, V> initialStream, String storeName,
String repartitionName, Duration windowDuration,
Function<V, String> deduplicationKeyExtractor) {
StoreBuilder<TimestampedKeyValueStore<String, V>> dedupStore =
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(storeName), Serdes.String(),
SerdesUtils.getSerdesForValue());
streamsBuilder.addStateStore(dedupStore);

StoreBuilder<WindowStore<String, V>> dedupWindowStore = Stores.windowStoreBuilder(
Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false),
Serdes.String(), SerdesUtils.getSerdesForValue());
streamsBuilder.addStateStore(dedupWindowStore);

var repartitioned = initialStream.repartition(
Repartitioned.with(Serdes.String(), SerdesUtils.<V>getSerdesForValue())
Expand Down
Loading