-
Notifications
You must be signed in to change notification settings - Fork 0
/
healthcheck.clj
37 lines (32 loc) · 1.34 KB
/
healthcheck.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(ns kafka-clj-utils.healthcheck
(:require [clojure.tools.logging :as log]
[kafka-clj-utils.utils :refer [normalize-config]]
[kafka-clj-utils.producers :as p]
[integrant.core :as ig])
(:import java.util.Map
org.apache.kafka.clients.consumer.KafkaConsumer
org.apache.kafka.common.serialization.StringDeserializer))
(require 'kafka-clj-utils.specs)
(defn ->healthcheck
[kafka-config]
(fn []
{:name "kafka",
:healthy? (boolean
(try (let [config (-> kafka-config
(merge {:request.timeout.ms 12000})
normalize-config)
key-serializer (StringDeserializer.)
value-serializer (StringDeserializer.)]
(with-open [c (KafkaConsumer. ^Map config
key-serializer
value-serializer)]
(.listTopics c)))
(catch Exception ex
(log/error ex "Kafka healthcheck failed")
false)))}))
(defmethod ig/pre-init-spec ::healthcheck
[_]
:kafka/config)
(defmethod ig/init-key ::healthcheck
[_ kafka-config]
(->healthcheck kafka-config))