-
Notifications
You must be signed in to change notification settings - Fork 1
/
topic_administrator.ts
44 lines (40 loc) · 1.27 KB
/
topic_administrator.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import {Kafka, ITopicConfig, AdminConfig} from 'kafkajs';
export class TopicAdministrator {
public static adminClientConfiguration: AdminConfig = {
retry: {
retries: 1,
initialRetryTime: 300,
maxRetryTime: 500
}
};
constructor(
protected kafka: Kafka,
protected topicConfig: Omit<ITopicConfig, 'topic'> = {},
protected adminConfig: AdminConfig = TopicAdministrator.adminClientConfiguration
) {
this.createTopic = this.createTopic.bind(this);
this.deleteTopic = this.deleteTopic.bind(this);
}
public async createTopic(topic: string) {
const adminClient = this.kafka.admin(this.adminConfig);
await adminClient.connect();
await adminClient.createTopics({
topics: [
{
topic,
...this.topicConfig
}
],
waitForLeaders: true
});
await adminClient.disconnect();
}
public async deleteTopic(topic: string) {
const adminClient = this.kafka.admin(this.adminConfig);
await adminClient.connect();
await adminClient.deleteTopics({
topics: [topic]
});
await adminClient.disconnect();
}
}