-
Notifications
You must be signed in to change notification settings - Fork 28
/
adminApi.js
185 lines (162 loc) · 6.49 KB
/
adminApi.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
const kafka = require('kafka-node');
const { zipArrays } = require('../utils/arrayHelper');
const offsetApi = require('./offsetApi');
const logger = require('../utils/logger');
const adminApi = {};
function wrapInTimeout(
callback,
initialMsToTimeout = 15000,
msIncreasePerTry = 5000,
triesIncreaseCap = 1,
) {
let tries = 0;
let currentTimeoutId;
let currentReject;
return function invokeWithTimeout(...args) {
if (currentTimeoutId) {
clearTimeout(currentTimeoutId);
currentTimeoutId = null;
}
if (currentReject) {
currentReject('ignore');
currentReject = null;
}
return new Promise((resolve, reject) => {
currentReject = reject.bind(this);
tries = Math.min(tries + 1, triesIncreaseCap);
callback(...args)
.then(result => resolve(result))
.catch(err => reject(err));
const msToTimeout = initialMsToTimeout + (tries - 1) * msIncreasePerTry;
currentTimeoutId = setTimeout(() => {
currentTimeoutId = null;
return reject(new Error(`${callback.name} timed out after ${msToTimeout}ms`));
}, msToTimeout);
});
};
}
function getTopicData(kafkaHostURI) {
const topicNamesToIgnore = ['__consumer_offsets', 'null', 'undefined'];
return new Promise((resolve, reject) => {
// Declares a new instance of client that will be used to make a connection
const client = new kafka.KafkaClient({ kafkaHost: kafkaHostURI });
// Declaring a new kafka.Admin instance creates a connection to the Kafka admin API
const admin = new kafka.Admin(client);
// Fetch all topics from the Kafka broker
admin.listTopics((err, data) => {
if (err) return reject(new Error(`getting list of Topics:${err}`));
// Reassign topics with only the object containing the topic data
logger.log('Result of admin.listTopics API call:', data);
const topicsMetadata = data[1].metadata;
logger.log('topicsMetadata obtained:', topicsMetadata);
const topics = Object.entries(topicsMetadata)
.filter(([topicName]) => !topicNamesToIgnore.includes(topicName))
.map(([topicName, topicPartitions]) => ({
numberOfPartitions: Object.keys(topicPartitions).length,
topicName,
}));
// for each topic, get # of partitions and storing that in topic partitions
const promises = topics.map(({ topicName, numberOfPartitions }) =>
adminApi.getTopicMsgCount(kafkaHostURI, topicName, numberOfPartitions),
);
return Promise.all(promises)
.then(topicMsgCounts => {
const result = zipArrays(topics, topicMsgCounts).map(([topicInfo, msgCount]) => ({
msgCount,
...topicInfo,
}));
logger.log('final topic Data:', result);
client.close();
return resolve(result);
})
.catch(error => {
client.close();
return reject(new Error(`getting all topicMsgCounts:${error}`));
});
});
});
}
/**
* @param {String} kafkaHostURI the connection uri that the user types into connection input
* @param {*} mainWindow Main window that gets data
*
* Makes a connection to Kafka server to fetch a list of topics
* Transforms the data coming back from the Kafka broker into pertinent data to send back to client
*/
adminApi.getTopicData = wrapInTimeout(getTopicData, 20000, 5000, 10);
/**
* @param {String} kafkaHostURI URI of Kafka broker(s)
* @param {String} topicName Single topic to lookup
* @param {Number} numberOfPartitions Number of partitions in a topic
*
* This function will return a promise. Function will loop through the number of partitions
* in a topic getting the current message count for each of the partitions.
* Resolves to the aggregated number of messages from all partitions.
*/
adminApi.getTopicMsgCount = (kafkaHostURI, topicName, numberOfPartitions) => {
const promises = [];
// Return a new promise
return new Promise((resolve, reject) => {
// Create for loop with limit of n-partition iterations
for (let i = 0; i < numberOfPartitions; i += 1) {
promises.push(adminApi.getPartitionMsgCount(kafkaHostURI, topicName, i));
}
// Resolves when all promises from array are resolved (with a single number)
Promise.all(promises)
.then(partitionMsgsCount => {
const topicMsgsCount = partitionMsgsCount.reduce((total, curr) => total + curr, 0);
resolve(topicMsgsCount);
})
.catch(err => reject(err));
});
};
/**
* @param {String} kafkaHostURI URI of Kafka broker(s)
* @param {String} topicName Single topic to lookup
* @param {Number} partitionId Topic partition number. Defaults to 0
*
* @returns {Promise} Resolves to the number of messages in a specific partition
*/
adminApi.getPartitionMsgCount = (kafkaHostURI, topicName, partitionId = 0) => {
const promises = [];
return new Promise((resolve, reject) => {
promises.push(offsetApi.getEarliestOffset(kafkaHostURI, topicName, partitionId));
promises.push(offsetApi.getLatestOffset(kafkaHostURI, topicName, partitionId));
Promise.all(promises)
.then(([earliestOffset, latestOffset]) => {
resolve(latestOffset - earliestOffset);
})
.catch(error => {
reject(error);
});
});
};
/**
* @param {String} kafkaHostURI URI of Kafka broker(s)
* @param {String} topicName Single topic to lookup
* @param {Number} partitionId Topic partition number. Defaults to 0
*
* This function returns data to the renderer process.
* Calls listTopics, then sends back the result as an object containing
* which broker is the leader and which ones contain replicas
*/
adminApi.getPartitionBrokers = (kafkaHostURI, topicName, partitionId = 0) => {
const client = new kafka.KafkaClient({ kafkaHost: kafkaHostURI });
const admin = new kafka.Admin(client);
const brokerPartitionData = [];
return new Promise((resolve, reject) => {
admin.listTopics((err, data) => {
if (err) return reject(err); // TODO: Handle listTopics error properly
// Reassign topics with only the object containing the topic info
// Isolate leader broker and replica brokers array into brokerPartitionData array
const topicsMetadata = data[1].metadata;
const { leader } = topicsMetadata[topicName][partitionId];
const replicas = topicsMetadata[topicName][partitionId].replicas.filter(b => b !== leader);
brokerPartitionData.push(leader);
brokerPartitionData.push(replicas);
client.close();
return resolve(brokerPartitionData);
});
});
};
module.exports = adminApi;