-
Notifications
You must be signed in to change notification settings - Fork 111
/
Copy pathmergeTopics.js
35 lines (28 loc) · 1.15 KB
/
mergeTopics.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"use strict";
const { KafkaStreams } = require("./../index.js");
const { nativeConfig: config } = require("./../test/test-config.js");
const kafkaStreams = new KafkaStreams(config);
kafkaStreams.on("error", (error) => {
console.log("Error occured:", error.message);
});
const stream1 = kafkaStreams.getKStream("my-input-topic-1");
const stream2 = kafkaStreams.getKStream("my-input-topic-2");
const stream3 = kafkaStreams.getKStream("my-input-topic-3");
//merge will make sure any message that is consumed on any of the streams
//will end up being emitted in the merged stream
//checkout other operations: join (outer, left, inner), combine, zip
//for other merge options
const mergedStream = stream1.merge(stream2).merge(stream3);
mergedStream
.filter(v => !!v); //you can use this stream as usual
//await for 3 kafka consumer
//and 1 kafka producer to be ready
Promise.all([
stream1.start(),
stream2.start(),
stream3.start(),
mergedStream.to("my-merged-output-topic") //BE AWARE that .to()s on a merged stream are async
]).then(_ => {
//consume and produce for 5 seconds
setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);
});