-
Notifications
You must be signed in to change notification settings - Fork 0
/
DataStoreWrapper.java
120 lines (103 loc) · 4.65 KB
/
DataStoreWrapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package org.wildfly.blog.kafka.streams;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.wildfly.blog.reactive.messaging.common.PageVisit;
/**
* @author <a href="mailto:kabir.khan@jboss.com">Kabir Khan</a>
*/
@ApplicationScoped
public class DataStoreWrapper implements Closeable {
private volatile KafkaStreams streams;
@Inject
private ConfigSupplier configSupplier = new ConfigSupplier() {
@Override
public String getBootstrapServers() {
return "localhost:9092";
}
@Override
public String getTopicName() {
return "page-visits";
}
};
DataStoreWrapper() {
}
@PostConstruct
void init() {
try {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configSupplier.getBootstrapServers()); // assuming that the Kafka broker this application is talking to runs on local machine with port 9092
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PageVisitSerde.class.getName());
// For this we want to read all the data
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore("test-store");
KTable<String, PageVisit> source = builder.table(
configSupplier.getTopicName(),
Materialized.<String, PageVisit>as(stateStore)
.withKeySerde(Serdes.String())
.withValueSerde(new PageVisitSerde()));
final Topology topology = builder.build();
this.streams = new KafkaStreams(topology, props);
final CountDownLatch startLatch = new CountDownLatch(1);
final AtomicReference<KafkaStreams.State> state = new AtomicReference<>();
streams.setStateListener((newState, oldState) -> {
state.set(newState);
switch (newState) {
case RUNNING:
case ERROR:
case PENDING_SHUTDOWN:
startLatch.countDown();
}
});
this.streams.start();
startLatch.await(10, TimeUnit.SECONDS);
System.out.println("Stream started");
if (state.get() != KafkaStreams.State.RUNNING) {
throw new IllegalStateException();
}
} catch (Exception e) {
if (this.streams != null) {
this.streams.close();
}
throw new RuntimeException(e);
}
}
public Map<String, String> readLastVisitedPageByUsers() {
StoreQueryParameters<ReadOnlyKeyValueStore<String, PageVisit>> sqp = StoreQueryParameters.fromNameAndType("test-store", QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<String, PageVisit> store = this.streams.store(sqp);
Map<String, String> lastPageByUser = new HashMap<>();
KeyValueIterator<String, PageVisit> it = store.all();
it.forEachRemaining(keyValue -> lastPageByUser.put(keyValue.key, keyValue.value.getPage()));
return lastPageByUser;
}
@PreDestroy
public void close() {
this.streams.close();
}
}