-
Notifications
You must be signed in to change notification settings - Fork 1
/
ProcessorAPI.kt
47 lines (37 loc) · 1.47 KB
/
ProcessorAPI.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package dev.chrzaszcz.kafka.examples.streams
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.processor.ProcessorSupplier
import java.util.*
private val logger = KotlinLogging.logger {}
fun main() {
val props = Properties()
props[StreamsConfig.APPLICATION_ID_CONFIG] = "count-application"
props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
createTopic(props)
val topology = Topology()
topology.addSource("mySource", "WordsTopic")
topology.addProcessor("myProcessor", ProcessorSupplier { StringCounter() }, "mySource")
topology.addSink(
"mySink",
"CountsTopic",
Serdes.String().serializer(),
Serdes.Integer().serializer(),
"myProcessor"
)
logger.info { topology.describe() }
val streams = KafkaStreams(topology, props)
streams.start()
}
private fun createTopic(props: Properties) {
logger.info { "Creating topic" }
val adminClient = AdminClient.create(props)
adminClient.createTopics(listOf(NewTopic("WordsTopic", 1, 1)))
}