-
Notifications
You must be signed in to change notification settings - Fork 3
/
core.clj
207 lines (179 loc) · 8.3 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
(ns piped.core
"The public API."
(:require [clojure.core.async :as async]
[piped.consumers :as consumers]
[piped.producers :as producers]
[piped.actions :as actions]
[piped.utils :as utils]
[cognitect.aws.client.api :as aws]
[cognitect.aws.http :as http]
[cognitect.http-client :as impl]
[piped.specs :as specs]
[clojure.tools.logging :as log]
[aws-api-credential-providers.core :as cp]))
(defprotocol PipedProcessor
:extend-via-metadata true
(start [this] "Start polling SQS and processing messages.")
(stop [this] "Stop the system and await completion of in-flight messages.")
(running? [this] "Is the system currently running?"))
; registry
(defonce processors (atom {}))
(defn get-all-processors
"Returns all registered systems in no particular order."
[]
(or (vals @processors) ()))
(defn get-processor-by-queue-url
"Gets the system for a given queue url. Returns nil if there is no such system."
[queue-url]
(get @processors queue-url))
(defn start-processor-by-queue-url!
"For a given queue-url, start the associated system (if any)."
[queue-url]
(some-> queue-url (get-processor-by-queue-url) (start)))
(defn stop-processor-by-queue-url!
"For a given queue-url, stop the associated system (if any)."
[queue-url]
(some-> queue-url (get-processor-by-queue-url) (stop)))
(defn start-all-processors!
"Stop all running systems."
[]
(run! start (get-all-processors)))
(defn stop-all-processors!
"Stop all running systems. Systems are stopped concurrently
for a faster return but this function blocks until they have
all been fully shutdown."
[]
(run! deref (doall (map #(future (stop %)) (get-all-processors)))))
(defn http-client
"Returns a http client using cognitect's async jetty client wrapper."
[http-opts]
(let [c (impl/create http-opts)]
(reify http/HttpClient
(-submit [_ request channel]
(impl/submit c request channel))
(-stop [_]
(impl/stop c)))))
(defn processor
"Spawns a set of producers and consumers for a given queue.
Returns an implementation of the PipedProcessor protocol which
represents a message processing machine that can be started and
stopped."
[{:keys [client-opts
queue-url
consumer-fn
producer-parallelism
consumer-parallelism
acker-parallelism
nacker-parallelism
blocking-consumers
transform]
:as opts}]
(specs/assert-options opts)
(letfn [(launch []
(let [consumer-parallelism (or consumer-parallelism 10)
blocking-consumers (if (boolean? blocking-consumers) blocking-consumers true)
producer-parallelism (or producer-parallelism (utils/quot+ consumer-parallelism 10))
acker-parallelism (or acker-parallelism producer-parallelism)
nacker-parallelism (or nacker-parallelism producer-parallelism)
max-http-ops (+ producer-parallelism acker-parallelism nacker-parallelism)
http-client (delay (http-client
{:pending-ops-limit max-http-ops
:max-connections-per-destination max-http-ops}))
credentials-provider (delay (cp/default-credentials-provider
(or (:http-client client-opts) (force http-client))))
client (cond-> (or client-opts {})
(not (contains? client-opts :http-client))
(assoc :http-client (force http-client))
(not (contains? client-opts :credentials-provider))
(assoc :credentials-provider (force credentials-provider))
:always
(assoc :api :sqs)
:always
(aws/client))
transform (if transform
(fn [msg]
(try
(transform msg)
(catch Exception e
(log/error e "Error in transformer.")
msg)))
identity)
acker-chan (async/chan)
nacker-chan (async/chan)
pipe (async/chan)
transformed (async/map transform [pipe])
acker-batched (utils/deadline-batching acker-chan 10 utils/message->deadline)
nacker-batched (utils/interval-batching nacker-chan 5000 10)]
(letfn [(spawn-producer []
(let [opts {:MaxNumberOfMessages (min 10 consumer-parallelism)}]
(producers/spawn-producer client queue-url pipe nacker-chan opts)))
(spawn-consumer []
(if blocking-consumers
(consumers/spawn-consumer-blocking client transformed acker-chan nacker-chan consumer-fn)
(consumers/spawn-consumer-async client transformed acker-chan nacker-chan consumer-fn)))
(spawn-acker []
(actions/spawn-acker client acker-batched))
(spawn-nacker []
(actions/spawn-nacker client nacker-batched))]
{:client client
:transform transform
:acker-chan acker-chan
:nacker-chan nacker-chan
:pipe pipe
:transformed transformed
:acker-batched acker-batched
:nacker-batched nacker-batched
:producers (doall (repeatedly producer-parallelism spawn-producer))
:consumers (doall (repeatedly consumer-parallelism spawn-consumer))
:ackers (doall (repeatedly acker-parallelism spawn-acker))
:nackers (doall (repeatedly nacker-parallelism spawn-nacker))})))]
(let [state
(atom (delay (launch)))
shutdown-thread
(Thread.
^Runnable
(fn []
(when (realized? (deref state))
(let [{:keys [pipe
acker-chan
nacker-chan
producers
consumers
ackers
nackers
client]} (force (deref state))]
; signal producers and consumers
(async/close! pipe)
; wait for producers to exit
(run! async/<!! producers)
; wait for consumers to exit
(run! async/<!! consumers)
; signal ackers
(async/close! acker-chan)
; wait for ackers to exit
(run! async/<!! ackers)
; signal nackers
(async/close! nacker-chan)
; wait for nackers to exit
(run! async/<!! nackers)
; close any http resources
(aws/stop client)))))
system
(reify PipedProcessor
(start [this]
(let [it (deref state)]
(when-not (realized? it)
(.addShutdownHook (Runtime/getRuntime) shutdown-thread)
(force it)))
this)
(running? [this]
(realized? (deref state)))
(stop [this]
(let [it (deref state)]
(when (realized? it)
(.removeShutdownHook (Runtime/getRuntime) shutdown-thread)
(.run shutdown-thread)
(reset! state (delay (launch)))))
this))]
(swap! processors assoc queue-url system)
system)))