-
Notifications
You must be signed in to change notification settings - Fork 3
/
sqs.clj
64 lines (56 loc) · 2.93 KB
/
sqs.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
(ns piped.sqs
"Functions relating to interacting with SQS."
(:require [cognitect.aws.client.api.async :as api.async]
[clojure.core.async :as async]
[piped.utils :as utils]))
(defn- combine-batch-results [result-chans]
(if (= 1 (bounded-count 2 result-chans))
(first result-chans)
(async/go-loop [channels (set result-chans)
results {:Successful [] :Failed []}]
(if (empty? channels)
results
(let [[value port] (async/alts! (vec channels))]
(recur
(disj channels port)
(-> results
(update :Successful #(into % (:Successful value [])))
(update :Failed #(into % (:Failed value []))))))))))
(defn change-visibility-one [client {:keys [ReceiptHandle] :as message} visibility-timeout]
(let [request {:op :ChangeMessageVisibility
:request {:QueueUrl (utils/message->queue-url message)
:ReceiptHandle ReceiptHandle
:VisibilityTimeout visibility-timeout}}]
(api.async/invoke client request)))
(defn change-visibility-batch [client messages visibility-timeout]
(->> (for [[queue-url messages] (group-by utils/message->queue-url messages)]
(let [request {:op :ChangeMessageVisibilityBatch
:request {:QueueUrl queue-url
:Entries (->> (for [{:keys [MessageId ReceiptHandle]} (rseq messages)]
{:Id MessageId
:ReceiptHandle ReceiptHandle
:VisibilityTimeout visibility-timeout})
(utils/distinct-by :Id))}}]
(api.async/invoke client request)))
(combine-batch-results)))
(defn ack-one [client {:keys [ReceiptHandle] :as message}]
(let [queue-url (utils/message->queue-url message)
request {:op :DeleteMessage
:request {:QueueUrl queue-url
:ReceiptHandle ReceiptHandle}}]
(api.async/invoke client request)))
(defn ack-many [client messages]
(->> (for [[queue-url messages] (group-by utils/message->queue-url messages)]
(let [request
{:op :DeleteMessageBatch
:request {:QueueUrl queue-url
:Entries (->> (for [{:keys [MessageId ReceiptHandle]} (rseq messages)]
{:Id MessageId
:ReceiptHandle ReceiptHandle})
(utils/distinct-by :Id))}}]
(api.async/invoke client request)))
(combine-batch-results)))
(defn nack-one [client message]
(change-visibility-one client message 0))
(defn nack-many [client messages]
(change-visibility-batch client messages 0))