/
kafka_consumer.clj
67 lines (61 loc) · 3.09 KB
/
kafka_consumer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
(ns common-clj.components.consumer.kafka-consumer
(:require [cheshire.core :refer [parse-string]]
[com.stuartsierra.component :as component]
[common-clj.coercion :refer [coerce]]
[common-clj.components.config.protocol :as config.protocol]
[common-clj.components.consumer.protocol
:as
consumer.protocol
:refer
[Consumer]]
[common-clj.lib.kafka :refer [kafka-topic->topic topic->kafka-topic]]
[common-clj.schemata.consumer :as schemata.consumer]
[schema.core :as s])
(:import java.util.Properties
[org.apache.kafka.clients.consumer ConsumerConfig KafkaConsumer]
org.apache.kafka.common.serialization.StringDeserializer))
(defn ^:private consumer-props [config]
(let [{:keys [app-name kafka-server]} (config.protocol/get-config config)
props (Properties.)
props-map [[ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG kafka-server]
[ConsumerConfig/GROUP_ID_CONFIG (str app-name)]
[ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG (.getName StringDeserializer)]
[ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG (.getName StringDeserializer)]
[ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int 1)]
[ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG false]
[ConsumerConfig/AUTO_OFFSET_RESET_CONFIG "earliest"]]]
(doseq [[k v] props-map] (.put props k v))
props))
(defn ^:private consumer-loop
[{:keys [kafka-client] :as component} consumer-topics]
(Thread/sleep 100)
(while true
(let [records (.poll kafka-client 100)]
(doseq [record records]
(let [kafka-topic (.topic record)
topic (kafka-topic->topic kafka-topic)
raw-message (.value record)
message (parse-string raw-message true)]
(consumer.protocol/consume! component topic message))))))
(defn new-kafka-client [props]
(KafkaConsumer. props))
(s/defrecord KafkaConsumerImpl [consumer-topics :- schemata.consumer/ConsumerTopics]
component/Lifecycle
(start [component]
(let [kafka-topics (->> consumer-topics keys (map topic->kafka-topic))
props (consumer-props (:config component))
kafka-client (new-kafka-client props)]
(.subscribe kafka-client kafka-topics)
(let [updated-component (assoc component :kafka-client kafka-client)
loop (Thread. #(consumer-loop updated-component consumer-topics))]
(.start loop)
(assoc updated-component :consumer-thread loop))))
Consumer
(consume! [component topic message]
(let [handler (get-in consumer-topics [topic :handler])
schema (get-in consumer-topics [topic :schema])
coerced-message (coerce schema message)]
(s/validate schema coerced-message)
(handler coerced-message component))))
(s/defn new-consumer [consumer-topics :- schemata.consumer/ConsumerTopics]
(map->KafkaConsumerImpl {:consumer-topics consumer-topics}))