-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumers.clj
103 lines (87 loc) · 4.05 KB
/
consumers.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
(ns piped.consumers
"Code relating to reading SQS messages from channels and processing them."
(:require [clojure.core.async :as async]
[clojure.tools.logging :as log]
[piped.utils :as utils]
[piped.sqs :as sqs]))
(defn- make-consumer
[client input-chan ack-chan nack-chan message-fn]
(async/go-loop [msg nil task nil]
(if (and (nil? msg) (nil? task))
(if-some [msg (async/<! input-chan)]
(recur msg (message-fn msg))
:complete)
(let [deadline (utils/message->deadline msg)
action (async/alt! [task] ([action] action) [deadline] :extend :priority true)]
(case action
nil
(recur nil nil)
:ack
(do (async/>! ack-chan msg) (recur nil nil))
:nack
(do (async/>! nack-chan msg) (recur nil nil))
:extend
; need to extend visibility of this message because it's still in-flight
; we don't want batching for this because immediacy is important here to
; avoid the message becoming visible for other consumers
(let [old-timeout (utils/message->timeout msg)
new-timeout (* 2 old-timeout)
response (do
(log/infof "Extending visibility for inflight message %s from %d to %d seconds."
(get msg :MessageId) old-timeout
new-timeout)
(async/<! (sqs/change-visibility-one client msg new-timeout)))]
(when (utils/anomaly? response)
(log/error "Error extending visibility timeout of inflight message." (pr-str response)))
(recur (-> msg (utils/with-deadline (* new-timeout 1000)) (utils/with-timeout new-timeout)) task)))))))
(defn- ->processor
"Turns a function with unknown behavior into a predictable
function with well-defined return values and no exceptions."
[processor-fn]
(fn [msg]
(try
(let [result (processor-fn msg)]
(if (contains? #{:ack :nack} result)
result
:ack))
(catch Exception e
(log/error e "Exception processing sqs message.")
:nack))))
(defn spawn-consumer-async
"Spawns a consumer fit for cpu bound or asynchronous tasks. Uses the core.async dispatch thread pool.
:client - an aws-api sqs client instance
:input-chan - a channel of incoming sqs messages
:ack-chan - a channel that accepts messages that should be acked
:nack-chan - a channel that accepts messages that should be nacked
:processor - a function of a message that either returns a result directly
or may return a core.async channel that emits once (like a
promise chan) when finished processing the message. Must not
block.
"
[client input-chan ack-chan nack-chan processor]
(make-consumer client input-chan ack-chan nack-chan
(let [lifted (->processor identity)]
(fn [msg]
(async/map
lifted
[(async/go
(try
(let [result (processor msg)]
(if (utils/channel? result)
(async/<! result)
result))
(catch Exception e
(log/error e "Exception processing sqs message in async consumer.")
:nack)))])))))
(defn spawn-consumer-blocking
"Spawns a consumer fit for synchronous blocking tasks. Uses a dedicated thread when processing a message.
:client - an aws-api sqs client instance
:input-chan - a channel of incoming sqs messages
:ack-chan - a channel that accepts messages that should be acked
:nack-chan - a channel that accepts messages that should be nacked
:processor - a function of a message that may perform blocking side effects with the message
"
[client input-chan ack-chan nack-chan processor]
(make-consumer client input-chan ack-chan nack-chan
(let [lifted (->processor processor)]
(fn [msg] (async/thread (lifted msg))))))