-
Notifications
You must be signed in to change notification settings - Fork 2
/
ConsumerStream.ts
104 lines (89 loc) · 2.29 KB
/
ConsumerStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/* eslint-disable no-underscore-dangle, no-await-in-loop */
import { Consumer, ConsumerConfig, ConsumerRunConfig, Kafka } from "kafkajs";
import { Readable } from "stream";
export class ConsumerStream extends Readable {
constructor(
kafka: Kafka,
options: {
config?: ConsumerConfig;
runConfig?: ConsumerRunConfig;
topic: { topic: string; fromBeginning?: boolean };
},
) {
super();
this.kafka = kafka;
this.config = options.config;
this.runConfig = options.runConfig;
this.topic = options.topic;
this.init();
}
private consumer: Consumer;
private kafka: Kafka;
private config?: ConsumerConfig;
private runConfig?: ConsumerRunConfig;
private topic: { topic: string; fromBeginning?: boolean };
private connected: boolean;
private started: boolean;
private paused: boolean;
private init() {
this.connected = false;
this.started = false;
this.paused = false;
}
_read() {
(async () => {
try {
await this.start();
} catch (e) {
this.destroy(e);
}
})();
}
private async start() {
if (!this.connected) {
this.connected = true;
this.consumer = this.kafka.consumer(this.config);
await this.consumer.connect();
await this.consumer.subscribe(this.topic);
this.consumer.on("consumer.crash", this.onCrash);
}
if (!this.started) {
this.started = true;
await this.run();
}
if (this.paused) {
this.paused = false;
}
}
private onCrash = async (err: Error) => {
console.error(err);
this.init();
await this.start();
};
private async run() {
await this.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
if (this.paused) {
return;
}
for (const message of batch.messages) {
if (this.paused) {
break;
}
const continueToPush = this.push(message.value);
resolveOffset(message.offset);
await heartbeat();
if (!continueToPush) {
this.paused = true;
}
}
},
...this.runConfig,
});
}
_destroy(error: Error | null) {
this.consumer.disconnect();
super.destroy(error === null ? undefined : error);
}
}