-
Notifications
You must be signed in to change notification settings - Fork 1
/
FavouriteColorApp.java
112 lines (90 loc) · 5.32 KB
/
FavouriteColorApp.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package kafka.streams.favouriteColor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class FavouriteColorApp {
// Start the server
// bin/zookeeper-server-start.sh config/zookeeper.properties
// bin/kafka-server-start.sh config/server-1.properties
// bin/kafka-server-start.sh config/server-2.properties
// bin/kafka-server-start.sh config/server-3.properties
//Problem:
//input:
//stephan,blue
//john,green
//stephan,red
//alice,red
//Crete the topics
//input topic:
//bin/kafka-topics.sh --create --topic favourite-color-input --zookeeper localhost:2181 --replication-factor 1 --partitions 1
//Create intermediate log compact topic (compact topic is just an optimization, it won't change the result)
//bin/kafka-topics.sh --create --topic user-keys-and-colors --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact
//Create intermediate log compact topic (compact topic is just an optimization, it won't change the result)
//bin/kafka-topics.sh --create --topic favourite-color-output --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --config cleanup.policy=compact
//View List of topics
//bin/kafka-topics.sh --list --zookeeper localhost:2181
//Launch a Kafka consumer
//bin/kafka-console-consumer.sh --topic favourite-color-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server localhost:9092
//Launch a Kafka Producer to accept in put
//bin/kafka-console-producer.sh --broker-list localhost:9092 --topic favourite-color-input
//input:
//stephan,blue
//john,green
//stephan,red
//alice,red
public static void main(String[] args) {
Properties props = new Properties();
props.putIfAbsent(StreamsConfig.APPLICATION_ID_CONFIG, "favourite-color-application"); //group.id
props.putIfAbsent(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
props.putIfAbsent(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.putIfAbsent(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//Only set this to 0 when doing development. Disable the cache to demonstrate all the steps involved in transformation - dont do it in prod
props.putIfAbsent(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
//Read from Kafka stream
//input:
//stephan,blue
KStream<String, String> textLines = builder.stream("favourite-color-input");
//Step1 : Crate a topic of user keys and colors eg: key=stephan value=red
KStream<String, String> usersAndColors = textLines
//Ignore bad values
.filter((key, value) -> value.contains(","))
//Make the user as the key
.selectKey((key, value) -> value.split(",")[0].toUpperCase())
//Make the color as the value
.mapValues(value -> value.split(",")[1].toUpperCase())
//Make sure the color is in the allowed color list
.filter((user, color) -> Arrays.asList("green", "blue", "red").contains(color));
usersAndColors.to("user-keys-and-colors");
//Step2: we read the topic as a KTable so that updates are read correctly
//This intermediate state will be stored my Kafka as a topic.
//eg: favourite-color-application-user-keys-and-colors-STATE-STORE-0000000006-changelog
KTable<String, String> userAndColorsTable = builder.table("user-keys-and-colors");
//Step: Count the occurrences of the colors
//This agg state is stored as a state in Kafka
//eg: favourite-color-application-KTABLE-AGGREGATE-STATE-STORE-0000000010-changelog
KTable<String, Long> favouriteColor = userAndColorsTable
//Group by colors and then count. Creating color as the key eg red: 2
//Changing the key causes re-partition
.groupBy((userIgnored, color) -> new KeyValue<>(color, color))
.count();
favouriteColor.toStream().to("favourite-color-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
//Only do this in dev not in prod
streams.cleanUp();
streams.start();
//Print the topology
System.out.println(streams.toString());
//Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}