-
Notifications
You must be signed in to change notification settings - Fork 0
/
sequential.clj
27 lines (22 loc) · 1.04 KB
/
sequential.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(ns sqs-consumer.sequential
(:require [amazonica.aws.sqs :as sqs]
[sqs-consumer.core :as core]
[sqs-consumer.utils :as utils]))
(defn delete-message [{:keys [queue-url aws-config]} receipt-handle]
(sqs/delete-message aws-config queue-url receipt-handle))
(defn sequential-process [process-fn]
(fn [{:keys [config messages]}]
(run! (fn [message] (process-fn {:message-body (:body message)
:delete-message #(delete-message config (:receipt-handle message))}))
messages)))
(defn with-auto-delete [process-fn]
(fn [{:keys [delete-message] :as message}]
(when (process-fn (dissoc message :delete-message))
(delete-message))))
(defn with-decoder [process-fn decoder]
(fn [{:keys [message]}]
(process-fn (decoder message))))
(def with-error-handling utils/with-error-handler)
;; TODO: this should also wrap `sequential-process`
(defn create-consumer [& {:keys [process-fn] :as args}]
(core/create-consumer (merge args {:process-fn (sequential-process process-fn)})))