-
Notifications
You must be signed in to change notification settings - Fork 1
/
job.clj
127 lines (104 loc) · 4.26 KB
/
job.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
(ns crank.job
(:require [clojure.tools.logging :as log]
[crank.kafka :as kafka])
(:import [java.time Duration]
[org.apache.kafka.common.errors WakeupException]))
(defn record->message [record]
{:topic (.topic record)
:partition (.partition record)
:offset (.offset record)
:timestamp (.timestamp record)
:key (.key record)
:value (.value record)})
(defn messages->offsets [messages]
(into {}
(for [[partition messages] (group-by :partition messages)]
[partition [(:offset (first messages))
(:offset (last messages))]])))
(defn run-loop [consumer stop {:keys [job-name func send-report topics attempts]
:or {attempts 0}
:as config}]
(let [job-id (.getId (Thread/currentThread))]
(send-report {:time (System/currentTimeMillis)
:job-name job-name
:job-id job-id
:type :start
:attempts attempts})
(.subscribe consumer topics)
(try
(loop [messages nil
previous (System/currentTimeMillis)]
;; messages are at least an empty vector, except for the first time
(when-not (nil? messages)
(send-report {:time (System/currentTimeMillis)
:job-name job-name
:job-id job-id
:type :poll
:count (count messages)}))
(if (:batch? config)
(when (seq messages)
(func messages)
(let [now (System/currentTimeMillis)]
(send-report {:time now
:duration (- now previous)
:job-name job-name
:job-id job-id
:type :batch
:topic (:topic (first messages))
:offsets (messages->offsets messages)})))
(doseq [message messages]
(when @stop
(throw (ex-info "stop iteration" {:stop true})))
(func message)
(let [now (System/currentTimeMillis)]
(send-report {:time now
:duration (- now previous)
:job-name job-name
:job-id job-id
:type :message
:topic (:topic message)
:offset (:offset message)
:partition (:partition message)}))))
(if @stop
(throw (ex-info "stop job" {:stop true}))
(do
(when (seq messages)
(.commitSync consumer))
(recur (->> (.poll consumer 100)
(mapv record->message))
(System/currentTimeMillis)))))
(catch Exception e
(if (or (:stop (ex-data e))
(and @stop
(or (instance? WakeupException e)
(instance? InterruptedException e))))
(do
(send-report {:time (System/currentTimeMillis)
:job-name job-name
:job-id job-id
:type :stop})
(log/infof "stopping job %s" job-name))
(do
(log/errorf e "job %s died" job-name)
(send-report {:time (System/currentTimeMillis)
:job-name job-name
:job-id job-id
:type :exception
:exception e})
(.close consumer Duration/ZERO)
(throw e))))
(finally
(.close consumer)
(log/debugf "job %s loop exiting" job-name)))))
(defn start-job [{:keys [kafka monitor-name job-name] :as config}]
(log/infof "starting job %s" job-name)
(let [consumer (kafka/make-consumer kafka)
stop (atom false)
worker (Thread. #(run-loop consumer stop config))]
(.setName worker (str monitor-name "-" job-name "-" (.getId worker)))
(.start worker)
{:config config
:worker worker
:report []
:stop! #(do (reset! stop true)
(.wakeup consumer))}))