/
core.clj
112 lines (96 loc) · 3.75 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
(ns grete.core
(:require [clojure.tools.logging :as log]
[clojure.string :as s]
[gregor.core :as gregor]
[grete.scheduler :as sch]))
(defn to-prop [k]
(-> k name (s/replace #"-" ".")))
(defn to-props
"ranames keys by converting them to strings and substituting dashes with periods
only does top level keys"
[conf]
(into {}
(for [[k v] conf]
[(to-prop k) v])))
;; TODO: waiting on gregor fix for: https://github.com/apache/kafka/pull/5480
#_(defn create-topic [{:keys [name] :as topic}
{:keys [zookeeper]}]
(let [zoo {:connection-string (zookeeper :hosts)}]
(when-not (gregor/topic-exists? zoo name)
(gregor/create-topic zoo name topic))))
(defn producer [topic {:keys [bootstrap-servers] :as conf}]
(let [serializers (-> (select-keys conf
[:key-serializer :value-serializer])
to-props)]
; (create-topic topic conf)
{:producer (gregor/producer bootstrap-servers
serializers)
:topic topic}))
(defn send!
([{:keys [producer topic]} msg]
(gregor/send producer topic msg))
([{:keys [producer topic]} key msg]
(gregor/send producer topic key msg)))
(defn close [{:keys [producer]}]
(gregor/close producer))
(defn- edn-to-consumer [{:keys [bootstrap-servers
group-id
topics] :as conf}]
[bootstrap-servers
group-id
topics
(to-props (dissoc conf :topics))])
;; consuming..
(defn consumer-records->maps [cs]
(-> (map gregor/consumer-record->map cs)
seq))
(defn poll
"fetches sequetially from the last consumed offset
return 'org.apache.kafka.clients.consumer.ConsumerRecords' currently available to the consumer (via a single poll)
if a 'timeout' param is 0, returns immediately with any records that are available now."
([consumer] (poll consumer 100))
([consumer timeout]
(.poll consumer timeout)))
(defn consumer [conf]
(log/info "consumer config:" (edn-to-consumer conf))
(->> (edn-to-consumer conf)
(apply gregor/consumer)))
(defn consume
"the 'process' function will take 'org.apache.kafka.clients.consumer.ConsumerRecords'
which can be turns to a seq of maps with 'consumer-records->maps'"
[consumer process running? ms n]
(log/info "starting" (inc n) "consumer")
(while @running?
(try
(let [consumer-records (poll consumer ms)]
(when consumer-records
(process consumer consumer-records)
(gregor/commit-offsets! consumer)))
(catch Throwable t
(log/error "kafka: could not consume a message" t))))
(gregor/close consumer))
(defn run-consumers [process {:keys [threads poll-ms] :as conf}]
(let [running? (atom true)
pool (sch/new-executor "kafka consumers" (if (number? threads)
threads
42))]
(dotimes [t threads]
(let [c (consumer (dissoc conf :threads :poll-ms))]
(log/info "subscribing to:" (gregor/subscription c))
(.submit pool #(consume c process running? poll-ms t))))
(log/info "started" threads "consumers ->" conf)
{:pool pool :running? running?}))
(defn stop-consumers [{:keys [pool running?]}]
(reset! running? false)
(.shutdown pool))
(defn offsets [c]
(for [tp (gregor/assignment c)]
(let [p (.partition tp)
t (.topic tp)]
{:topic t :partition p :offset (gregor/committed c t p)})))
(defn reset-offsets [c topic pnum]
(let [offsets (reduce (fn [ofs p]
(conj ofs {:topic topic
:partition p
:offset 0})) [] (range pnum))]
(gregor/commit-offsets! c offsets)))