-
Notifications
You must be signed in to change notification settings - Fork 3
/
producers.clj
107 lines (89 loc) · 4.45 KB
/
producers.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
(ns piped.producers
"Code relating to polling SQS messages from AWS and getting them onto channels."
(:require [piped.utils :as utils]
[clojure.core.async :as async]
[cognitect.aws.client.api.async :as api.async]
[clojure.tools.logging :as log]
[clojure.core.async.impl.protocols :as impl]))
(defn spawn-producer
([client queue-url output-chan]
(spawn-producer client queue-url output-chan (utils/dev-null)))
([client queue-url output-chan nack-chan]
(spawn-producer client queue-url output-chan nack-chan {}))
([client queue-url output-chan nack-chan
{:keys [MaxNumberOfMessages VisibilityTimeout MaxArtificialDelay]
:or {MaxNumberOfMessages 10 VisibilityTimeout 30 MaxArtificialDelay 60000}}]
(async/go-loop [max-number-of-messages MaxNumberOfMessages backoff-seq []]
(log/debugf "Beginning new long poll of sqs queue %s." queue-url)
(let [request
{:op :ReceiveMessage
:request {:QueueUrl queue-url
:MaxNumberOfMessages max-number-of-messages
:VisibilityTimeout VisibilityTimeout
:WaitTimeSeconds utils/maximum-wait-time-seconds
:AttributeNames ["All"]
:MessageAttributeNames ["All"]}}
; poll for messages
response
(async/<! (api.async/invoke client request))
original-messages
(get response :Messages [])
deadline
(async/timeout (- (* VisibilityTimeout 1000) utils/deadline-safety-buffer))
metadata
{:deadline deadline :queue-url queue-url :timeout VisibilityTimeout}
messages-with-metadata
(mapv #(with-meta % metadata) original-messages)
[action remainder]
(if (empty? messages-with-metadata)
(cond (utils/anomaly? response)
[:error []]
(impl/closed? output-chan)
[:closed []]
:else
[:empty []])
(loop [[message :as messages] messages-with-metadata]
(if (seq messages)
(if-some [result
(async/alt!
[[output-chan message]]
([val _] (if val ::accepted nil))
[deadline] ::timeout
:priority true)]
(if (= ::accepted result)
(recur (rest messages))
[:expired (into [] messages)])
[:closed (into [] messages)])
[:accepted []])))]
(case action
:error
(let [[backoff :as backoff-seq] (or (seq backoff-seq) (utils/backoff-seq MaxArtificialDelay))]
(log/errorf "Error returned when polling sqs queue %s. Waiting for %d milliseconds. %s" queue-url backoff (pr-str response))
(async/<! (async/timeout backoff))
(recur max-number-of-messages (rest backoff-seq)))
:empty
(recur max-number-of-messages [])
:expired
(let [wanted max-number-of-messages
received (count original-messages)
accepted (- received (count remainder))
new-count (utils/clamp
utils/minimum-messages-received
utils/maximum-messages-received
(utils/average- accepted wanted))]
(log/warnf "Consumers were unable to accept %d messages from %s before the messages expired." (count remainder) queue-url)
; probably just let aws handle it and skip the network traffic since already very near expiry
#_(async/onto-chan! nack-chan remainder false)
(recur new-count []))
:closed
(do (log/debugf "Producer stopping because channel for queue %s has been closed." queue-url)
(async/<! (async/onto-chan! nack-chan remainder false))
:complete)
:accepted
(let [received (count original-messages)
new-count (utils/clamp
utils/minimum-messages-received
utils/maximum-messages-received
(utils/average+ received utils/maximum-messages-received))]
(log/debugf "All messages polled from %s were accepted by consumers." queue-url)
(recur new-count [])))))))