Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka 3.x] Remove deprecated methods from TO #5522

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -11,8 +11,9 @@
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
Expand Down Expand Up @@ -44,7 +45,6 @@ public TopicStoreTopologyProvider(
}

@Override
@SuppressWarnings("deprecation")
public Topology get() {
StreamsBuilder builder = new StreamsBuilder();

Expand Down Expand Up @@ -87,8 +87,7 @@ public Topology get() {
* In the case of invalid store update result is not-null.
* Dispatcher applies the result to a waiting callback CompletionStage.
*/
@SuppressWarnings("deprecation")
private static class TopicCommandTransformer implements Processor<String, TopicCommand> {
private static class TopicCommandTransformer implements Processor<String, TopicCommand, Void, Void> {
private final String topicStoreName;
private final ForeachAction<? super String, ? super Integer> dispatcher;

Expand All @@ -109,22 +108,22 @@ public void init(ProcessorContext context) {
}

@Override
public void process(String key, TopicCommand value) {
String uuid = value.getUuid();
TopicCommand.Type type = value.getType();
public void process(final Record<String, TopicCommand> record) {
String uuid = record.value().getUuid();
TopicCommand.Type type = record.value().getType();
Integer result = null;
switch (type) {
case CREATE:
Topic previous = store.putIfAbsent(key, value.getTopic());
Topic previous = store.putIfAbsent(record.key(), record.value().getTopic());
if (previous != null) {
result = KafkaStreamsTopicStore.toIndex(TopicStore.EntityExistsException.class);
}
break;
case UPDATE:
store.put(key, value.getTopic());
store.put(record.key(), record.value().getTopic());
break;
case DELETE:
previous = store.delete(key);
previous = store.delete(record.key());
if (previous == null) {
result = KafkaStreamsTopicStore.toIndex(TopicStore.NoSuchEntityExistsException.class);
}
Expand Down