Skip to content

Commit

Permalink
feat: implement minimal topology (for example)
Browse files Browse the repository at this point in the history
  • Loading branch information
ccamel committed Apr 9, 2022
1 parent 18beac8 commit 9a4e94b
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions src/main/kotlin/com/okp4/processor/cosmos/topology.kt
@@ -0,0 +1,33 @@
package com.okp4.processor.cosmos

import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.Consumed
import org.apache.kafka.streams.kstream.Named
import org.apache.kafka.streams.kstream.Produced
import org.slf4j.LoggerFactory
import java.util.*

/**
* Simple Kafka Stream Processor that consumes a message on a topic and returns a new message on another.
*/
fun topology(props: Properties): Topology {
val logger = LoggerFactory.getLogger("com.okp4.processor.cosmos.topology")
val topicIn = requireNotNull(props.getProperty("topic.in")) {
"Option 'topic.in' was not specified."
}
val topicOut = requireNotNull(props.getProperty("topic.out")) {
"Option 'topic.out' was not specified."
}

val streamsBuilder = StreamsBuilder()

streamsBuilder.stream(topicIn, Consumed.with(Serdes.String(), Serdes.String()).withName("input"))
.peek({ _, _ -> logger.info("Received a message") }, Named.`as`("log"))
.map({ k, v -> KeyValue(k, "Hello $v!") }, Named.`as`("map-value"))
.to(topicOut, Produced.with(Serdes.String(), Serdes.String()).withName("output"))

return streamsBuilder.build()
}

0 comments on commit 9a4e94b

Please sign in to comment.