-
Notifications
You must be signed in to change notification settings - Fork 0
/
admin.clj
55 lines (47 loc) · 2.4 KB
/
admin.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
(ns kafka-clojure-client.admin
(:require [clojure.tools.logging :as log])
(:import (org.apache.kafka.clients.consumer KafkaConsumer OffsetAndMetadata Consumer)
(org.apache.kafka.common TopicPartition PartitionInfo)
(org.apache.kafka.common.serialization StringDeserializer)
(java.util Map)
(java.io Closeable)))
(defn- get-partitions-for-topic [^Consumer consumer topic]
(map #(.partition ^PartitionInfo %) (.partitionsFor consumer topic)))
(defn- get-committed-offset-for-partition [^Consumer consumer topic partition]
(if-let [^OffsetAndMetadata o (.committed consumer (TopicPartition. topic partition))]
(.offset o)
0))
(defn- ^Map get-end-offsets-for-partitions [^Consumer consumer partitions]
(try
(.endOffsets consumer partitions)
(catch Exception ex
(log/warn ex "Get log end offset failed" partitions)
(into {} (map #(vector % 0) partitions)))))
(defn- get-lags [^Consumer consumer topic]
(when-let [partitions (not-empty (get-partitions-for-topic consumer topic))]
(let [end-offsets (get-end-offsets-for-partitions consumer (map #(TopicPartition. topic %) partitions))]
(apply sorted-map
(interleave partitions
(map #(let [committed-offset (get-committed-offset-for-partition consumer topic %)
log-end-offset (get end-offsets (TopicPartition. topic %) 0)]
(- log-end-offset committed-offset))
partitions))))))
(defprotocol ConsumerLagChecker
(get-lags-for-topic [this topic]))
(deftype MonitorClient [consumer]
ConsumerLagChecker
(get-lags-for-topic [_ topic]
(get-lags consumer topic))
Closeable
(close [_]
(.close ^Consumer consumer)))
(defn create-monitor-client [bootstrap-servers group-id]
(let [consumer (KafkaConsumer. ^Map {"bootstrap.servers" bootstrap-servers
"group.id" group-id
"enable.auto.commit" "false"
"session.timeout.ms" "30000"
"key.deserializer" "StringDeserializer"
"value.deserializer" "StringDeserializer"}
(StringDeserializer.)
(StringDeserializer.))]
(MonitorClient. consumer)))