-
Notifications
You must be signed in to change notification settings - Fork 111
/
Copy pathwindow.js
35 lines (26 loc) · 1.11 KB
/
window.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"use strict";
const { KafkaStreams } = require("./../index.js");
const { nativeConfig: config } = require("./../test/test-config.js");
const kafkaStreams = new KafkaStreams(config);
kafkaStreams.on("error", (error) => {
console.log("Error occured:", error.message);
});
const consumeStream = kafkaStreams.getKStream("my-input-topic");
const windowPeriod = 30 * 1000; // 30 seconds
const from = Date.now();
const to = Date.now() + windowPeriod;
//window will collect messages that fall in the period range
//a message with a timestamp larger or equal to "to" will end the window
//and emit all collected messages on the returned stream
const { stream, abort } = consumeStream.window(from, to);
stream
.take(10) //take the first 10 messages from within the window and close the stream
.forEach(windowMessage => {
console.log(windowMessage); //do something with the message that was within the window
}).then(_ => {
//done
kafkaStreams.closeAll();
});
//start the stream
consumeStream.start();
//setTimeout(() => { abort(); }, 5000); // -> abort the window collection after 5 seconds