forked from strimzi/strimzi-kafka-operator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TopicStoreTopologyProvider.java
139 lines (122 loc) · 5.26 KB
/
TopicStoreTopologyProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.topic;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
/**
* Kafka Streams topology provider for TopicStore.
*/
public class TopicStoreTopologyProvider implements Supplier<Topology> {
private final String storeTopic;
private final String topicStoreName;
private final Properties kafkaProperties;
private final ForeachAction<? super String, ? super Integer> dispatcher;
public TopicStoreTopologyProvider(
String storeTopic,
String topicStoreName,
Properties kafkaProperties,
ForeachAction<? super String, ? super Integer> dispatcher
) {
this.storeTopic = storeTopic;
this.topicStoreName = topicStoreName;
this.kafkaProperties = kafkaProperties;
this.dispatcher = dispatcher;
}
@Override
public Topology get() {
StreamsBuilder builder = new StreamsBuilder();
// Simple defaults
Map<String, String> configuration = new HashMap<>();
configuration.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
configuration.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "0");
configuration.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(64 * 1024 * 1024));
// Input topic command -- store topic
// Key is Kafka topic name -- which is also used for KeyValue store key
KStream<String, TopicCommand> topicRequest = builder.stream(
storeTopic,
Consumed.with(Serdes.String(), new TopicCommandSerde())
);
// Data structure holds all topic information
StoreBuilder<KeyValueStore<String /* topic */, Topic>> topicStoreBuilder =
Stores
.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(topicStoreName),
Serdes.String(), new TopicSerde()
)
.withCachingEnabled()
.withLoggingEnabled(configuration);
builder.addStateStore(topicStoreBuilder);
topicRequest.process(
() -> new TopicCommandTransformer(topicStoreName, dispatcher),
topicStoreName
);
return builder.build(kafkaProperties);
}
/**
* This processor applies topic command to key-value store.
* It then updates dispatcher with store modification result.
* In the case of invalid store update result is not-null.
* Dispatcher applies the result to a waiting callback CompletionStage.
*/
private static class TopicCommandTransformer implements Processor<String, TopicCommand, Void, Void> {
private final String topicStoreName;
private final ForeachAction<? super String, ? super Integer> dispatcher;
private KeyValueStore<String, Topic> store;
public TopicCommandTransformer(
String topicStoreName,
ForeachAction<? super String, ? super Integer> dispatcher
) {
this.topicStoreName = topicStoreName;
this.dispatcher = dispatcher;
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store = (KeyValueStore<String, Topic>) context.getStateStore(topicStoreName);
}
@Override
public void process(final Record<String, TopicCommand> record) {
String uuid = record.value().getUuid();
TopicCommand.Type type = record.value().getType();
Integer result = null;
switch (type) {
case CREATE:
Topic previous = store.putIfAbsent(record.key(), record.value().getTopic());
if (previous != null) {
result = KafkaStreamsTopicStore.toIndex(TopicStore.EntityExistsException.class);
}
break;
case UPDATE:
store.put(record.key(), record.value().getTopic());
break;
case DELETE:
previous = store.delete(record.key());
if (previous == null) {
result = KafkaStreamsTopicStore.toIndex(TopicStore.NoSuchEntityExistsException.class);
}
break;
}
dispatcher.apply(uuid, result);
}
@Override
public void close() {
}
}
}